线程 vs 协程
并发编程是整个计算机科学中相当复杂的领域之一,它们往往涉及线程与锁,进而引入一系列 状态爆炸 、数据竞争、死锁、不确定 bug、回调噩梦 等问题。
我很早就关注协程这一技术,业界诸如微信使用协程服务于上亿用户。在今年部门举办的编程大赛中,给了我使用协程这一“新技术”编写服务端的机会。本文将以这个游戏服务端为例介绍协程的概念与使用,它能够并发处理数个游戏客户端的对抗。
早在上世纪 50 年代便出现了协程这一概念,发扬光大却在 21 世纪后,它的流行大大简化了编写并发程序的难度,并且提高了程序的健壮性。
尽管协程的思想和语言无关,但它们需要语言特性级别的支持,仅仅通过框架实现协程,或多或少很蹩脚。
在 C++20 中便支持了协程特性,而它是作为框架开发者使用的特性,由框架开发者构造一些协程库,并提供给用户使用。一些协程标准库的支持需要到 C++23 中。
越来越多的现代语言都支持协程这一机制,例如 Python 3.7 起开始支持协程,还有 C#、Go、NodeJs 等都提供了一定程度的支持。我最终选型使用 Python 作为开发语言,标准库中的 asyncio
足够完成这一任务,并且开发效率足够高。
协程是并发编程的一种模式,相对于多线程而言,它通常以单线程的调度器为根基,并发调度不同的协程,以完成复杂的任务。因此使用协程的好处是可以做到无锁并发编程,没有死锁,没有回调,没有竞争,也就无需大量的状态,在这种环境下编写程序,关注点更聚焦,降低了编程的心智负担。
可能有人会觉得反直觉,单线程如何做到并发?并发从微观上看分时复用的,而从宏观角度看是同时进行的。早在单核时代便支持了线程机制,使用线程做并发,由操作系统进行调度实现并发。即便是单核,你也摆脱不了锁,因为一旦你是用了多线程编程,就不能对核数有任何假设,或许哪天你的机器升级后程序就异常了。
使用协程没有这个问题,协程调度器充当了操作系统协程调度的角色,因此操作系统不感知上层应用的情况,也就不会涉及昂贵的上下文切换代价。
从用户角度而言,使用线程,你的执行过程随时可能被切换,导致你不得不加锁、状态;使用协程,你的协程切换由你决定(使用 await
关键字)。这便是线程与协程的核心区别:前者切换权通常在操作系统上,后者切换权在用户手上(抢占与非抢占)。
定义协程
协程相比普通的过程(函数)而言,多了两个动作,主动挂起(await
)与恢复(resume
)。
其中挂起由用户发起,它通知调度器当前协程在异步等待(async wait)一个结果,要求调度器调度下一个协程。简而言之,每一个 await
点便是协程的一个潜在切换点。
恢复是由调度器执行的,当挂起后的协程收到结果后,由调度器恢复协程的控制流。因此对于用户而言,只需要关注 await
动作。
正因为它比过程多了两个动作,编程语言、框架需要区分一个协程与一个函数,通常在定义协程的时候使用关键字 async
修饰(C++20 通过返回类型区分协程与普通函数)。
在我设计与实现的服务端中,使用 TCP socket,选择 TCP 是因为它是可靠的字节流传输,但它不像 UDP 那样以报文为单位,因此自定义的通讯消息需要接收端自行组包。
那么就这涉及到一个问题,如何区分一个消息的边界,以及组包?通讯协议参考了 18 年软件大赛的方式,消息使用 json 数据格式,并在前 5 个字节标明了 json 数据的长度。
00065{"msg_type": "registration", "msg_data": {"player_name": "test"}}
使用 json 的一个好处在于,一来它是结构化的文本数据,相对于二进制数据无需关注大小端问题;二来它与编程语言无关(虽然起源于 js),因此可以支持任意编程语言客户端的参与。
async def load_message(reader: asyncio.StreamReader):
length = (await reader.read(5)).decode()
if not length: raise Disconnected()
return await reader.readexactly(int(length))
load_message
定义的时候通过 async
修饰,表示它是一个协程,因此也就支持 await
动作。
消除回调
Linux 的 io 一般都是同步 IO,例如 read
操作要么阻塞调用者,要么非阻塞调用者,无论如何都需要调用者亲自去获取结果:前者通过阻塞调用者得到结果,后者需要调用者通过轮询方式。
这就是同步与异步的差别,异步无需调用者去获取结果,它是被动的接受结果。当然将同步 io 改成异步 io 也很简单,起一些线程专门做这件事,并通过回调通知,这是很典型的编程模式。异步使得回调的存在,回调又可能进一步导致竞争问题。
在协程 load_message
通过 await read
表明异步地等待 read 的结果,并通知调度器调度下一个协程,例如这里的 read
就是一个协程,调度器每次根据调度策略来决定是调度其他协程,还是轮询 io 结果等。一旦有了轮询的结果,恢复协程 load_message
的控制流。
load_message
没有回调,也没有被阻塞,整个过程非常连贯,以直观的序列模型编写异步代码。
消除状态机
对于这个服务端而言,可能有人觉得即便不用协程,直接使用单线程,并使用 epoll/select
等接口监听是否收到消息,收到消息后分发给消息处理函数也能完成整个任务,这种方式是经典的事件驱动、Reactor 模式。
然而这个分发过程涉及到状态机,例如收到客户端的一条消息,然后根据消息进入状态机所设定的一系列处理函数。
如果一个完整的行为涉及消息种类比较多,且大量的消息交互,那么需要维护一个巨大的状态机,而状态的迁移却被割裂在代码各个地方,无法看透一个完整的流程。
状态模型和序列模型都能较好的反映一件事情的本质,但不同的是,序列模型更加专注于目标系统的行为,而状态模型更关注被操作对象的状态迁移。
但从实现的角度来看,如果你选择序列模型,当然,你需要实现目标系统的行为;而如果你选择状态模型,那么除了需要管理状态机之外,你仍然需要处理目标系统的行为。这就意味着,在实现层面,一旦选择状态模型,就需要做许多额外的工作。
因此,为了在代码层面理解一个状态机的设计,必须仔细的阅读相关代码,并在不同代码间来回跳转(因为状态一直在跳转),才能理解一个状态机的全貌。
对于的复杂的系统而言,由于各种并发,及并发的丰富组合,要么会导致状态的急剧膨胀,以至于状态机及其的晦涩,难以理解和维护。
以这个游戏服务端为例,一个完整的过程涉及三大部分:用户注册、游戏开始、游戏结束。使用协程可以很好的看到整个过程,无需状态机。
async def handle_connection(reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
try:
player = await game_service.handle_registration(reader, writer) # 注册
await game_service.game_loop(player, reader, writer) # 游戏开始
await game_service.handle_game_over(player, writer) # 游戏结束
except Disconnected:
logger.info("peer closed connection")
except StatusError:
logger.info("current game isn't registration state")
except ErrMsgType as err:
logger.warn("receive error msg type: {}".format(err._type))
finally:
logger.info("disconnect client")
writer.close()
async def game_loop(self, player: Player,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
while self._round <= MAX_ROUND:
writer.write(self.inquire(player)) # 向客户端发送查询消息
await writer.drain()
await self._handle_action(player, reader) # 客户端回复游戏指令
self._checkout_actions()
如无必要,请勿引入多余的状态。
消除锁
既然使用协程,那么可以进行无锁编程,应用程序无需再考虑死锁、竞争问题。当指定的客户端数量注册就绪后,游戏将进入开始阶段。开始游戏后,如果有其他客户端连接,那么会拒绝接入,因此需要一个状态表明当前游戏的阶段。
class GameService:
ST_REGISTRATION = 1
ST_STARTED = 2
在所有的客户端从注册状态并发进入开始状态,对状态的置位无需加锁,即便与此同时可能会存在新的客户端接入对此状态的判断。
@serialize_json
def game_start(self, player: Player): # 所有客户端都会并发进入该函数
def _start():
if self._status == GameService.ST_STARTED: return # 状态判断
logger.info("current players: {}".format(len(self._players)))
self._game_map.alloca_object_id(self._players)
self._game_map.dump()
self._status = GameService.ST_STARTED # 状态置位
_start()
return {...};
game_start
是一个普通的函数,在函数结束前都不会被切换,它独占着整个调度器的线程,好在这个函数没有耗时的操作,也没有多余的等待。
无锁编程从根本上避免了死锁的问题。
同步原语
前面提到,在指定数量的客户端都注册后,游戏才进入开始阶段。因此需要处理客户端的协程等待,等到数量集齐后,才同步执行后边的流程。
因此协程需要提供一定的同步手段,和传统的并发编程一样,它提供了条件变量等同步原语。
async def handle_registration(self, player_id: int,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
if self._status != GameService.ST_REGISTRATION: raise StatusError
message = await load_message(reader)
player = await self.registration(message, player_id)
async with self._players_cond: # 条件变量做同步
await self._players_cond.wait_for(lambda:
len(self._players) >= MAX_PLAYER_NUM) # 异步地等待所有客户端集齐
writer.write(self.game_start(player)) # 给客户端发送开始消息
await writer.drain()
return player
另一个比较有意思的设计,每回合服务端收集所有客户端的指令,并在这个回合内依次结算。这就要求所有客户端在指定时间内响应,提前收集完则提前进入结算阶段,或者直到有一方超时,将只处理收到的指令。如何实现?
我们可以异步等待一个接受客户端指令的协程,并要求在指定时间内响应,否则抛出超时异常。与此同时,我们不能一收到客户端的指令就处理,还得在指定时间内收集完其他客户端的指令。因此可以起一个协程专门等待是否收集完所有指令,并同时(concurrent/gather 语义)等待它们。
async def _handle_action(self, player: Player, reader: asyncio.StreamReader):
@parse_json
def action(data):
return data
async def _waitAction(): # 指定时间内收集当前客户端的指令,否则超时异常
message = await asyncio.wait_for(load_message(reader),
ACTION_TIME_OUT)
self._actions.append((player, action(message))) # 指令暂存
async with self._actions_cond:
self._actions_cond.notify_all() # 同步原语
async def _waitAllAction(): # 同时收集所有客户端的指令,超时正常返回
try:
async with self._actions_cond:
await asyncio.wait_for(
self._actions_cond.wait_for(lambda:
len(self._actions) == len(self._players)),
ACTION_TIME_OUT + 1e-3) # 条件变量,判断是否集齐指令
except asyncio.TimeoutError:
logger.warn("wait all action timeout, checking actions...")
try:
# 同时等待这两个协程的结果
await asyncio.gather(_waitAction(), _waitAllAction())
except asyncio.TimeoutError:
player._timeout_times += 1
logger.warn(f"player {player._name} action timeout!");
logger.warn(f"(times: {player._timeout_times})")
同步原语是必须的,当它们并不是为了防止资源竞争。
注意事项
协程不能有任何阻塞的行为,因为它使得调度器被白白浪费在空等的协程上,使得其他协程没机会被调度到,调度效率也大大降低了。
因此协程使用的 io 接口都是协程化的,这些 io 协程在系统调用封装之上,避免了阻塞和等待。在这个游戏服务器里边需要服务端自行退出,当游戏达到结束状态时,如何自行退出?
async def main():
robot_server.game_service = robot_server.GameService(game_map)
server = await asyncio.start_server(robot_server.handle_connection,
"0.0.0.0", 6789)
async with server:
await server.serve_forever() # 如何在指定条件下结束?
asyncio
文档中提到,需要 cancel
掉server_forever
协程,才能正确的退出。
coroutine serve_forever()
Start accepting connections until the coroutine is cancelled. Cancellation of serve_forever task causes the server to be closed.
因此可以并发起一个协程在背后检测游戏服务端的状态,以便结束的时候对上述协程进行cancel
。
async with server:
forever_task = asyncio.create_task(server.serve_forever())
async def check_exit():
while robot_server.game_service._status != GameService.ST_END:
await asyncio.sleep(0) # 必须的
forever_task.cancel()
asyncio.create_task(check_exit())
try:
await forever_task
except asyncio.CancelledError:
print("game server exit...")
在协程 check_exit
是一个 while
循环,循环体内使用了 await sleep(0)
,sleep
本身就是一个协程,它表明主动放弃控制权,让调度器调度其他协程。如果不使用await
,那么这个协程将会一直运行,其他协程没有任何执行的可能。
sleep() always suspends the current task, allowing other tasks to run.
Setting the delay to 0 provides an optimized path to allow other tasks to run. This can be used by long-running functions to avoid blocking the event loop for the full duration of the function call.
除了 asyncio
提供对系统 io 封装后的协程,用户可能会遇到一些真正耗时的计算协程,或者其他阻塞调用,这时候可以通过库提供的线程池、进程池接口run_in_executor
,将这些耗时任务扔到线程池、进程池上运行,以避免阻塞调度线程。
其他抽象手段
异常处理
传统的错误码方式虽然执行效率高,但每一条语句面临着判断错误码、满屏代码中有一半的代码都在判断错误的尴尬场景。这也是开发效率与执行效率的权衡。
介于错误码方式与异常方式之间的手段还有 Result Monad 抽象,但它不是本节讨论的范围。
使用异常处理的代码更聚焦于逻辑,没有其他的噪音,你无需在编写代码的时候考虑异常,因为异常关注点被分离,要么你专注于处理异常,要么你专注于编写逻辑。
async def handle_connection(reader: asyncio.StreamReader,
writer: asyncio.StreamWriter):
try:
player = await game_service.handle_registration(reader, writer) # 注册
await game_service.game_loop(player, reader, writer) # 游戏开始
await game_service.handle_game_over(player, writer) # 游戏结束
except Disconnected:
logger.info("peer closed connection")
except StatusError:
logger.info("current game isn't registration state")
except ErrMsgType as err:
logger.warn("receive error msg type: {}".format(err._type))
finally:
logger.info("disconnect client")
writer.close()
如果条件允许,请使用异常处理异常。
装饰器设计模式
装饰器设计模式可以在不改变功能的前提下扩展功能,例如要求在每个函数调用前、后分别执行其他动作。
对于每次收到数据包,解析动作可以被分离出去,函数只关注对数据的处理,可以在对数据处理之前,使用装饰器进行解析,然后扔给处理函数。发送数据包也是类似的。
Python 自带了装饰器特性,因此无需我们编写一大堆样板代码。装饰器是一个高阶函数,接受一个被装饰的函数。
def serialize_json(func): # 定义装饰器,装饰函数 func
def wrapper(*argv, **kw):
buf = json.dumps({
"msg_type": func.__name__,
"msg_data": func(*argv, **kw)
})
packed = "{:05d}{}".format(len(buf), buf).encode()
logger.info(f'packed message `{packed}`')
return packed
return wrapper
@serialize_json
def game_over():
return [{
"team_name": team._name,
"shield_num": team._registry._shield_num
} for team in self._teams]
在 game_over
函数执行后,对函数返回的数据进行序列化。
工程集成
最终服务端需要上线,以便参赛者能够参与进来。假设有 100 个参赛者,每局一个玩家且需要 1 分钟的比赛,那么需要 100 分钟才能跑完一轮。而服务器是 192 核的,它能并行处理 192 个任务,如何提高服务器效率?
我的答案是使用传统的 Makefile 方式,依赖关系是 参赛者 -> 比赛日志,参赛者之间没有任何依赖,因此可以利用多核的优势并行进行,可能在 1 分钟内便能得到上百场比赛的结果。
make -f wildcard.mk all