协程版本说明

协程说明

tarscpp 3.x全面启用对协程的支持!

服务框架全面融合协程, 使得使用者可以方便的使用协程, 本文主要介绍如何使用协程.

关于协程, 你需要重点的了解以下几点概念:

  • 目前基础库中协程的调度和切换使用的boost库中的源码实现(几个汇编文件)

  • 协程本质上一种轻量的线程, 简单得理解你可以在线程中调度协程, 即启动/休眠/唤醒等

  • 目前不支持跨线程调度协程, 即每个线程都有一个协程调度器, 通过该对象来调度该线程中的协程

框架和协程相关的类

  • TC_CoroutineInfo, 协程信息类, 正常情况下业务代码不需要感知该对象

  • TC_Coroutine, 协程类, 继承于线程类(TC_Thread), 用来给业务快速使用协程, 这个类主要在一个线程中启动一批协程

  • TC_CoroutineScheduler, 协程调度器类, 负责管理和调度协程, 本质上就是管理和调度TC_CoroutineInfo

  • 如果希望自己创建和调度协程, 需要重点理解TC_CoroutineScheduler

  • 每个线程都有唯一的TC_CoroutineScheduler, 可以通过: TC_CoroutineScheduler::create/scheduler获取掉调度器, 从而自己控制协程创建等逻辑

  • TC_Thread, 线程类增加了协程的支持

  • 使用它的startCoroutine方法启动以后, TC_Thread::run则在协程运行

  • 可以run中使用TC_CoroutineScheduler来自己实现协程的创建等逻辑

  • TC_CoroutineQueue, 用于跨线程的协程的数据交互, 队列没有数据时, 协程会阻塞在epoller上, 当有网络事件或者其他协程调度时会被唤醒处理其他事件!

协程启动的方式

通常在业务服务中, 不需要自己去启动协程, 只需要根据服务模型配置服务模型即可, 服务框架会根据配置, 自动将服务的线程变成协程, 即处理业务代码在协程中执行, 此时发起rpc调用即使是同步方式, 会变成协程模式, 从而变成异步模式, 具体后续文档中会介绍.

当然如果你想自己控制启动协程, 可以参考以下方式.

线程转协程

如果你处于任何一个线程中, 你希望在当前线程能调度协程, 你可以参考以下示例代码

auto scheduler = TC_CoroutineScheduler::create();
scheduler->setPoolStackSize(10, 128*1024);
scheduler->go([&]()
{
    scheduler->setNoCoroutineCallback([=](TC_CoroutineScheduler* s)
    {
        s->terminate();
    });

    cout << "incoroutine" << endl;
});

scheduler->run();

说明:

  • 理论上任何线程都能调度协程, 这时候你需要做的是创建协程调度器TC_CoroutineScheduler, 然后用协程调度器创建和调度协程

  • 每个协程都要消耗一定的空间, 你可以用setPoolStackSize来控制线程中协程的个数以及每个协程消耗的内存大小, 这个内存大小指的协程的栈空间, 因此你使用协程的时候请注意不要在栈上分配太多内存

  • setPoolStackSize需要在run之前执行

  • 协程如果要被调度, 比如运行: scheduler->run(), 这是一个阻塞函数, 会阻塞当前线程, 同时会调度创建出来的协程

  • 通常情况下, run不会返回, 但是如果希望结束调度, 可以调用terminate来结束协程的调度

  • run函数会阻塞住, 注意调度器其实阻塞在epoller上, 因此TC_CoroutineScheduler可以通过getEpoller方法拿到epoller指针

  • 原则上该epoller对象可以用于网络通信, 而框架上也是这样实现的, 因此协程调度和网络收发可以通过同一个epoller来调度

  • 正是因为统一了epoller对象, 框架上能做到rpc调用和服务端线程, 完全在一个线程中调度(通过协程来调度唤醒), 从而网络收发是异步的且不切换线程, 一定程度降低了通信的延时

  • setNoCoroutineCallback表示如果协程调度器发现没有任何协程可以调度时执行的函数, 上面示例中是调用了terminate方法结束了协程调度

TC_Thread

自己创建调度器, 控制调度毕竟还是麻烦, 因此提供了扩展了TC_Thread是的更加方便.

TC_Thread在9.x以前版本中, 代表的线程, 当你需要实现线程的时候, 可以继承TC_Thread, 然后实现run方法, 调用start即可启动一个线程.

在9.x版本中, 你可以同样的方式继承TC_Thread, 但是调用startCoroutine方法启动协程, 即此时run方法处于协程中, 比如以下示例代码:


class CoThread : public TC_Thread
{
public:
	CoThread() {}

	virtual void run()
	{
		cout << "in coroutine" << endl;	
	}
};

//创建了一个线程
CoThread* a = new CoThread();

//以协程方式启动的
a->startCoroutine(10, 128*1024, true);

//等待线程结束
a->join();

delete a;

说明:

  • startCoroutine的前两个参数是控制该线程中, 协程池的个数和栈大小, 第三个参数是表示: 如果没有活跃的协程可调度时, 线程是否自动退出(如果为false, 即使没有协程了, 也不会退出).

  • 此时run本质在协程中调度起来的, 因此你在run中可以调度更多的协程

进一步, 在协程中启动更多的协程:


class CoThread1 : public TC_Thread
{
public:
	CoThread1() {}

	virtual void run()
	{
        //使用调度器以协程方式启动其他协程
		TC_CoroutineScheduler::scheduler()->go(std::bind(&CoThread1::doCo, this));
	}

	void doCo()
	{
		CoThreadDo = true;
	}
};

CoThreadDo = false;

CoThread1* a = new CoThread1();

a->startCoroutine(10, 128*1024, true);

a->getThreadControl().join();

ASSERT_TRUE(CoThreadDo);

delete a;

说明:

  • 在run中可以拿到调度器, 以协程方式启动其他协程

  • 注意协程调度器对于本线程来说是唯一的, 就是在同一个线程中地方都可以拿到当前线程的调度器

  • TC_CoroutineScheduler提供了sleep, yield, put等函数, 可以用来调度协程

  • 每个线程都有一个id, 这个id在本线程中协程是唯一的, 跨线程的协程id不唯一

  • TC_CoroutineScheduler每个线程都有自己独立的对象, 通过create创建, 通过scheduler获取

  • 这个例子中由于启动的startCoroutine, 因此TC_CoroutineScheduler已经被创建出来了, 在run中只需要使用即可

TC_Coroutine

这个类可以一次创建多个协程出来, 拥有TC_Thread之后, 这个类其实用得相对较少, 示例代码如下:


class MyCoroutine : public TC_Coroutine
{
protected:
	void handle()
	{
		++_count;

		this->go(std::bind(&MyCoroutine::co_test, this));
	}

	void co_test()
	{
		++_count;
	}

public:
	static atomic<int> _count;
};

atomic<int> MyCoroutine::_count{0};

MyCoroutine::_count = 0;

MyCoroutine co;

co.setCoroInfo(10, 200, 128*1024);

co.start();

co.join();

说明:

  • 业务可以继承这个类并实现handle方法

  • handle会被调用10次, 即创建了多个协程则被调用多少次

服务模型和协程

服务模型扩展成四种, 同时服务模型下层到tc_epoll_server中, 即最底层的服务也能设置协程模型, 示例可以参考源码中: unittest/test_tc_epoller_server.cpp, 四个服务模型中和协程相关的重点是:

  • NET_THREAD_QUEUE_HANDLES_CO

  • NET_THREAD_MERGE_HANDLES_THREAD

  • NET_THREAD_MERGE_HANDLES_CO

说明:

  • 这三种模型, 实际处理业务都在协程中执行, 即协程调度器已经都创建了

  • NET_THREAD_QUEUE_HANDLES_CO & NET_THREAD_MERGE_HANDLES_CO 两种模式下, 服务端业务处理线程处于协程状态, 这一点非常重要, 可以和rpc的通信器配合, 后续会介绍到

  • NET_THREAD_MERGE_HANDLES_THREAD这种模式, 虽然服务器已经处于协程中了, 但是业务线程不是默认感知到, 没有设置ServantProxyThreadData::getData()->_sched

对于底层的epollserver, 可以通过api控制服务的模型和协程参数:

//设置服务模型
TC_EpollServer::setOpenCoroutine

//设置协程池梳理和栈大小
TC_EpollServer::setCoroutineStack

对于rpc服务而言, 可以通过参数控制模型, 修改模板即可

<taf>
    <application>
        <server>
            coroutinememsize=1073741824
            coroutinestack=131072
            opencoroutine=0
        </server>
    </application>
</taf>

注意:

  • opencoroutine得值从0~3, 分别对应TC_EpollServer::SERVER_OPEN_COROUTINE得值

  • coroutinememsize表示协程池总共可以分配的协程栈的内存

  • coroutinestack每个协程分配的栈空间大小

通信器和协程

通信器是rpc调用的客户端资源集合体, 理解它的模型是非常关键的.

普通的通信器模型

9.x以前的版本我们称之为普通的通信器模型, 具体说明如下:

  • 通信器内部其实有独立的网络线程(可能多个), 网络线程主要就负责网络数据的收发.

  • 业务线程发送数据时, 会唤醒网络线程去发送

  • 如果是同步调用模式, 业务线程会线程锁阻塞, 当网络线程收到回包以后, 会通过锁唤醒业务线程

  • 如果是异步调用模式, 业务线程会继续执行, 当网络线程收到回包丢给异步callback线程(该线程个数也可以配置), 然后在异步回调线程中执行回调

  • 以上流程中, 可以看到当发送数据时, 会有多次线程切换, 如果加上服务器端的线程切换, 一次rpc会至少会涉及到4次线程的切换.

协程模式下的通信器

协程的版本诞生可以减少线程的切换调度, 降低rpc的延时, 协程模式的网络通信器确实很复杂很多, 我们来看一下协程模式下通信器的设计:

  • 普通通信器网络线程其实是独立的, 和业务线程没有任何关系, 但是协程模式下, 如果业务线程是协程且开启了协程网络通信器, 那么每个业务线程中都有自己独立的网络通信器;

  • 此时网络通信器不再自带网络线程, 而是直接使用了业务线程来收发网络数据, 即网络通信器的网络epoller直接使用了业务线程中协程调度器的epoller, 这样将协程调度和网络调度融合和统一了!

这种模式下具体的rpc逻辑如下:

  • 同步调用

  • 业务线程将数据放入队列中以后, 直接调用网络通信器发包(当然第一次会自动创建网路通信器), 同时业务线程的协程调度器的底层epoller对象会传递给网路通信器

  • 如果响应包没有回来, 则yield(注意业务线程处于协程中, yield即表示自身放弃调度)

  • 当网络回包回来以后, epoller会感知到, 接收数据分析数据是哪个协程发送的, 然后put(coroId), 唤醒协程, 从而之前业务线程会在yield中醒过来

  • 整个过程, 对于客户端而言, 全部在业务线程中执行

  • 异步调用

  • 异步调用的模式和同步调用模式几乎一样, 只是发起rpc的协程, 发送包以后并不yield, 而是继续执行

如何开启协程模式的通信器

并不是只要业务线程启用了协程, 就默认开启了协程模式的通信器, 还需要做以下处理:

ServantProxyThreadData::getData()->_sched = TC_CoroutineScheduler::scheduler();

只有这样设置以后, 通信器才会感知到业务处于协程模式, 且启用协程网络通信器.

**注意老版本协程模式相当于 opencoroutine=1 **

NET_THREAD_QUEUE_HANDLES_CO & NET_THREAD_MERGE_HANDLES_CO 两种模式下, 服务端的业务处理线程, 默认就已经设置了, 这样在服务器业务线程中发起rpc时, 本质上是协程模式, 网络收发都在业务线程中处理的!

协程模式下有哪些影响

  • 业务线程必须开启协程(即存全局的协程调度器, 同时设置了 ServantProxyThreadData::getData()->_sched调度为线程全局调度器), 这样通信器才能感知到协程模式;

  • 由于每个业务线程都有自己的网路通信器, 原则上, 内存消耗会加大(当然这点内存可以忽略不计);

  • 每个网络通信器都会自己独立的连接, 这样会带来一个prx背后对应了多条服务的连接, 加大了服务的连接数, 当然服务现在都是epoller实现, 这几乎不是一个问题;

  • 协程模式, 每个协程有自己的栈空间, 因此会加大内存消耗!

Last updated