ROOT
ROOT
文章目录
  1. 线程 vs 协程
  2. 定义协程
    1. 消除回调
    2. 消除状态机
    3. 消除锁
    4. 同步原语
    5. 注意事项
  3. 其他抽象手段
    1. 异常处理
    2. 装饰器设计模式
  4. 工程集成
  5. 参考链接

异步并发编程实践之协程

线程 vs 协程

并发编程是整个计算机科学中相当复杂的领域之一,它们往往涉及线程与锁,进而引入一系列 状态爆炸 、数据竞争、死锁、不确定 bug、回调噩梦 等问题。

我很早就关注协程这一技术,业界诸如微信使用协程服务于上亿用户。在今年部门举办的编程大赛中,给了我使用协程这一“新技术”编写服务端的机会。本文将以这个游戏服务端为例介绍协程的概念与使用,它能够并发处理数个游戏客户端的对抗。

早在上世纪 50 年代便出现了协程这一概念,发扬光大却在 21 世纪后,它的流行大大简化了编写并发程序的难度,并且提高了程序的健壮性。

尽管协程的思想和语言无关,但它们需要语言特性级别的支持,仅仅通过框架实现协程,或多或少很蹩脚。

在 C++20 中便支持了协程特性,而它是作为框架开发者使用的特性,由框架开发者构造一些协程库,并提供给用户使用。一些协程标准库的支持需要到 C++23 中。

越来越多的现代语言都支持协程这一机制,例如 Python 3.7 起开始支持协程,还有 C#、Go、NodeJs 等都提供了一定程度的支持。我最终选型使用 Python 作为开发语言,标准库中的 asyncio 足够完成这一任务,并且开发效率足够高。

协程是并发编程的一种模式,相对于多线程而言,它通常以单线程的调度器为根基,并发调度不同的协程,以完成复杂的任务。因此使用协程的好处是可以做到无锁并发编程,没有死锁,没有回调,没有竞争,也就无需大量的状态,在这种环境下编写程序,关注点更聚焦,降低了编程的心智负担。

可能有人会觉得反直觉,单线程如何做到并发?并发从微观上看分时复用的,而从宏观角度看是同时进行的。早在单核时代便支持了线程机制,使用线程做并发,由操作系统进行调度实现并发。即便是单核,你也摆脱不了锁,因为一旦你是用了多线程编程,就不能对核数有任何假设,或许哪天你的机器升级后程序就异常了。

使用协程没有这个问题,协程调度器充当了操作系统协程调度的角色,因此操作系统不感知上层应用的情况,也就不会涉及昂贵的上下文切换代价。

从用户角度而言,使用线程,你的执行过程随时可能被切换,导致你不得不加锁、状态;使用协程,你的协程切换由你决定(使用 await 关键字)。这便是线程与协程的核心区别:前者切换权通常在操作系统上,后者切换权在用户手上(抢占与非抢占)。

定义协程

协程相比普通的过程(函数)而言,多了两个动作,主动挂起(await)与恢复(resume)。

其中挂起由用户发起,它通知调度器当前协程在异步等待(async wait)一个结果,要求调度器调度下一个协程。简而言之,每一个 await 点便是协程的一个潜在切换点。

恢复是由调度器执行的,当挂起后的协程收到结果后,由调度器恢复协程的控制流。因此对于用户而言,只需要关注 await 动作。

正因为它比过程多了两个动作,编程语言、框架需要区分一个协程与一个函数,通常在定义协程的时候使用关键字 async 修饰(C++20 通过返回类型区分协程与普通函数)。

在我设计与实现的服务端中,使用 TCP socket,选择 TCP 是因为它是可靠的字节流传输,但它不像 UDP 那样以报文为单位,因此自定义的通讯消息需要接收端自行组包。

那么就这涉及到一个问题,如何区分一个消息的边界,以及组包?通讯协议参考了 18 年软件大赛的方式,消息使用 json 数据格式,并在前 5 个字节标明了 json 数据的长度。

00065{"msg_type": "registration", "msg_data": {"player_name": "test"}}

使用 json 的一个好处在于,一来它是结构化的文本数据,相对于二进制数据无需关注大小端问题;二来它与编程语言无关(虽然起源于 js),因此可以支持任意编程语言客户端的参与。

async def load_message(reader: asyncio.StreamReader):
    length = (await reader.read(5)).decode()
    if not length: raise Disconnected()
    return await reader.readexactly(int(length))

load_message定义的时候通过 async 修饰,表示它是一个协程,因此也就支持 await 动作。

消除回调

Linux 的 io 一般都是同步 IO,例如 read 操作要么阻塞调用者,要么非阻塞调用者,无论如何都需要调用者亲自去获取结果:前者通过阻塞调用者得到结果,后者需要调用者通过轮询方式。

这就是同步与异步的差别,异步无需调用者去获取结果,它是被动的接受结果。当然将同步 io 改成异步 io 也很简单,起一些线程专门做这件事,并通过回调通知,这是很典型的编程模式。异步使得回调的存在,回调又可能进一步导致竞争问题。

在协程 load_message 通过 await read 表明异步地等待 read 的结果,并通知调度器调度下一个协程,例如这里的 read 就是一个协程,调度器每次根据调度策略来决定是调度其他协程,还是轮询 io 结果等。一旦有了轮询的结果,恢复协程 load_message 的控制流。

load_message没有回调,也没有被阻塞,整个过程非常连贯,以直观的序列模型编写异步代码。

消除状态机

对于这个服务端而言,可能有人觉得即便不用协程,直接使用单线程,并使用 epoll/select 等接口监听是否收到消息,收到消息后分发给消息处理函数也能完成整个任务,这种方式是经典的事件驱动、Reactor 模式。

然而这个分发过程涉及到状态机,例如收到客户端的一条消息,然后根据消息进入状态机所设定的一系列处理函数。

如果一个完整的行为涉及消息种类比较多,且大量的消息交互,那么需要维护一个巨大的状态机,而状态的迁移却被割裂在代码各个地方,无法看透一个完整的流程。

状态模型和序列模型都能较好的反映一件事情的本质,但不同的是,序列模型更加专注于目标系统的行为,而状态模型更关注被操作对象的状态迁移。

但从实现的角度来看,如果你选择序列模型,当然,你需要实现目标系统的行为;而如果你选择状态模型,那么除了需要管理状态机之外,你仍然需要处理目标系统的行为。这就意味着,在实现层面,一旦选择状态模型,就需要做许多额外的工作。

因此,为了在代码层面理解一个状态机的设计,必须仔细的阅读相关代码,并在不同代码间来回跳转(因为状态一直在跳转),才能理解一个状态机的全貌。

对于的复杂的系统而言,由于各种并发,及并发的丰富组合,要么会导致状态的急剧膨胀,以至于状态机及其的晦涩,难以理解和维护。

以这个游戏服务端为例,一个完整的过程涉及三大部分:用户注册、游戏开始、游戏结束。使用协程可以很好的看到整个过程,无需状态机。

async def handle_connection(reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
    try:
        player = await game_service.handle_registration(reader, writer) # 注册
        await game_service.game_loop(player, reader, writer) # 游戏开始
        await game_service.handle_game_over(player, writer) # 游戏结束

    except Disconnected:
        logger.info("peer closed connection")
    except StatusError:
        logger.info("current game isn't registration state")
    except ErrMsgType as err:
        logger.warn("receive error msg type: {}".format(err._type))
    finally:
        logger.info("disconnect client")
        writer.close()

async def game_loop(self, player: Player,
                    reader: asyncio.StreamReader,
                    writer: asyncio.StreamWriter):
    while self._round <= MAX_ROUND:
        writer.write(self.inquire(player)) # 向客户端发送查询消息
        await writer.drain()

        await self._handle_action(player, reader) # 客户端回复游戏指令
        self._checkout_actions()

如无必要,请勿引入多余的状态。

消除锁

既然使用协程,那么可以进行无锁编程,应用程序无需再考虑死锁、竞争问题。当指定的客户端数量注册就绪后,游戏将进入开始阶段。开始游戏后,如果有其他客户端连接,那么会拒绝接入,因此需要一个状态表明当前游戏的阶段。

class GameService:
    ST_REGISTRATION = 1
    ST_STARTED      = 2

在所有的客户端从注册状态并发进入开始状态,对状态的置位无需加锁,即便与此同时可能会存在新的客户端接入对此状态的判断。

@serialize_json
def game_start(self, player: Player): # 所有客户端都会并发进入该函数
    def _start():
        if self._status == GameService.ST_STARTED: return # 状态判断
        logger.info("current players: {}".format(len(self._players)))
        self._game_map.alloca_object_id(self._players)
        self._game_map.dump()
        self._status = GameService.ST_STARTED # 状态置位
    _start()

    return {...};

game_start是一个普通的函数,在函数结束前都不会被切换,它独占着整个调度器的线程,好在这个函数没有耗时的操作,也没有多余的等待。

无锁编程从根本上避免了死锁的问题。

同步原语

前面提到,在指定数量的客户端都注册后,游戏才进入开始阶段。因此需要处理客户端的协程等待,等到数量集齐后,才同步执行后边的流程。

因此协程需要提供一定的同步手段,和传统的并发编程一样,它提供了条件变量等同步原语。

async def handle_registration(self, player_id: int,
                            reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
    if self._status != GameService.ST_REGISTRATION: raise StatusError
    message = await load_message(reader)
    player = await self.registration(message, player_id)
    async with self._players_cond: # 条件变量做同步
        await self._players_cond.wait_for(lambda:
            len(self._players) >= MAX_PLAYER_NUM) # 异步地等待所有客户端集齐

    writer.write(self.game_start(player)) # 给客户端发送开始消息
    await writer.drain()
    return player

另一个比较有意思的设计,每回合服务端收集所有客户端的指令,并在这个回合内依次结算。这就要求所有客户端在指定时间内响应,提前收集完则提前进入结算阶段,或者直到有一方超时,将只处理收到的指令。如何实现?

我们可以异步等待一个接受客户端指令的协程,并要求在指定时间内响应,否则抛出超时异常。与此同时,我们不能一收到客户端的指令就处理,还得在指定时间内收集完其他客户端的指令。因此可以起一个协程专门等待是否收集完所有指令,并同时(concurrent/gather 语义)等待它们。

async def _handle_action(self, player: Player, reader: asyncio.StreamReader):
    @parse_json
    def action(data):
        return data

    async def _waitAction(): # 指定时间内收集当前客户端的指令,否则超时异常
        message = await asyncio.wait_for(load_message(reader),
                                         ACTION_TIME_OUT)
        self._actions.append((player, action(message))) # 指令暂存
        async with self._actions_cond:
            self._actions_cond.notify_all() # 同步原语

    async def _waitAllAction(): # 同时收集所有客户端的指令,超时正常返回
        try:
            async with self._actions_cond:
                await asyncio.wait_for(
                    self._actions_cond.wait_for(lambda:
                        len(self._actions) == len(self._players)),
                    ACTION_TIME_OUT + 1e-3) # 条件变量,判断是否集齐指令
        except asyncio.TimeoutError:
            logger.warn("wait all action timeout, checking actions...")

    try:
        # 同时等待这两个协程的结果
        await asyncio.gather(_waitAction(), _waitAllAction())
    except asyncio.TimeoutError:
        player._timeout_times += 1
        logger.warn(f"player {player._name} action timeout!");
        logger.warn(f"(times: {player._timeout_times})")

同步原语是必须的,当它们并不是为了防止资源竞争。

注意事项

协程不能有任何阻塞的行为,因为它使得调度器被白白浪费在空等的协程上,使得其他协程没机会被调度到,调度效率也大大降低了。

因此协程使用的 io 接口都是协程化的,这些 io 协程在系统调用封装之上,避免了阻塞和等待。在这个游戏服务器里边需要服务端自行退出,当游戏达到结束状态时,如何自行退出?

async def main():
    robot_server.game_service =  robot_server.GameService(game_map)
    server = await asyncio.start_server(robot_server.handle_connection,
                                        "0.0.0.0", 6789)
    async with server:
        await server.serve_forever() # 如何在指定条件下结束?

asyncio文档中提到,需要 cancelserver_forever协程,才能正确的退出。

coroutine serve_forever()

Start accepting connections until the coroutine is cancelled. Cancellation of serve_forever task causes the server to be closed.

因此可以并发起一个协程在背后检测游戏服务端的状态,以便结束的时候对上述协程进行cancel

async with server:
    forever_task = asyncio.create_task(server.serve_forever())
    async def check_exit():
        while robot_server.game_service._status != GameService.ST_END:
            await asyncio.sleep(0) # 必须的
        forever_task.cancel()

    asyncio.create_task(check_exit())

    try:
        await forever_task
    except asyncio.CancelledError:
        print("game server exit...")

在协程 check_exit 是一个 while 循环,循环体内使用了 await sleep(0)sleep 本身就是一个协程,它表明主动放弃控制权,让调度器调度其他协程。如果不使用await,那么这个协程将会一直运行,其他协程没有任何执行的可能。

sleep() always suspends the current task, allowing other tasks to run.

Setting the delay to 0 provides an optimized path to allow other tasks to run. This can be used by long-running functions to avoid blocking the event loop for the full duration of the function call.

除了 asyncio 提供对系统 io 封装后的协程,用户可能会遇到一些真正耗时的计算协程,或者其他阻塞调用,这时候可以通过库提供的线程池、进程池接口run_in_executor,将这些耗时任务扔到线程池、进程池上运行,以避免阻塞调度线程。

其他抽象手段

异常处理

传统的错误码方式虽然执行效率高,但每一条语句面临着判断错误码、满屏代码中有一半的代码都在判断错误的尴尬场景。这也是开发效率与执行效率的权衡。

介于错误码方式与异常方式之间的手段还有 Result Monad 抽象,但它不是本节讨论的范围。

使用异常处理的代码更聚焦于逻辑,没有其他的噪音,你无需在编写代码的时候考虑异常,因为异常关注点被分离,要么你专注于处理异常,要么你专注于编写逻辑。

async def handle_connection(reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
    try:
        player = await game_service.handle_registration(reader, writer) # 注册
        await game_service.game_loop(player, reader, writer) # 游戏开始
        await game_service.handle_game_over(player, writer) # 游戏结束

    except Disconnected:
        logger.info("peer closed connection")
    except StatusError:
        logger.info("current game isn't registration state")
    except ErrMsgType as err:
        logger.warn("receive error msg type: {}".format(err._type))
    finally:
        logger.info("disconnect client")
        writer.close()

如果条件允许,请使用异常处理异常。

装饰器设计模式

装饰器设计模式可以在不改变功能的前提下扩展功能,例如要求在每个函数调用前、后分别执行其他动作。

对于每次收到数据包,解析动作可以被分离出去,函数只关注对数据的处理,可以在对数据处理之前,使用装饰器进行解析,然后扔给处理函数。发送数据包也是类似的。

Python 自带了装饰器特性,因此无需我们编写一大堆样板代码。装饰器是一个高阶函数,接受一个被装饰的函数。

def serialize_json(func): # 定义装饰器,装饰函数 func
    def wrapper(*argv, **kw):
        buf = json.dumps({
            "msg_type": func.__name__,
            "msg_data": func(*argv, **kw)
        })
        packed = "{:05d}{}".format(len(buf), buf).encode()
        logger.info(f'packed message `{packed}`')
        return packed
    return wrapper

@serialize_json
def game_over():
    return [{
        "team_name": team._name,
        "shield_num": team._registry._shield_num
    } for team in self._teams]

game_over 函数执行后,对函数返回的数据进行序列化。

工程集成

最终服务端需要上线,以便参赛者能够参与进来。假设有 100 个参赛者,每局一个玩家且需要 1 分钟的比赛,那么需要 100 分钟才能跑完一轮。而服务器是 192 核的,它能并行处理 192 个任务,如何提高服务器效率?

我的答案是使用传统的 Makefile 方式,依赖关系是 参赛者 -> 比赛日志,参赛者之间没有任何依赖,因此可以利用多核的优势并行进行,可能在 1 分钟内便能得到上百场比赛的结果。

make -f wildcard.mk all

参考链接

支持一下
扫一扫,支持Netcan
  • 微信扫一扫
  • 支付宝扫一扫