taf 异步调用源码分析

导语:对于服务端来说,异步处理相比同步处理在性能上可能会有成倍的提高,本篇就对taf的异步处理进行一个简单的分析。描述客户端进行异步调用之后,taf的都进行了哪些的处理。

本文主要是项目在架构升级异步的时候,时不时出现死锁现象,因此想了解一下异步处理的时候线程执行逻辑是什么,最后看完了,发现和 taf 无关。

本文介绍基于 taf 协议的异步调用,rpc 和 http 的也基本类似。异步调用是通过 async_XXX 接口,调用时需要传递一个异步回调对象。通过定义 jce 协议,taf 自带工具是可以自动生成相应的异步调用的接口实现。代码如下:

   
void AppTafProxy::async_doRequest(AppTafPrxCallbackPtr callback,const std::string &param,const map<string, string>& context)
		{   
				taf::JceOutputStream<taf::BufferWriter> _os;
				_os.write(param, 1); 
				std::map<string, string> _mStatus;
				taf_invoke_async(taf::JCENORMAL,"doRequest", _os.getByteBuffer(), context, _mStatus, callback);
		}  

在该方法中,通过调用父类 ServantProxy 的 taf_invoke_async 方法进入异步处理。在 taf_invoke_async 中初始化请求的消息体 ReqMessage,设定请求类型为异步、设定异步回调的对象 cb(在异步处理结束之后,调用该 cb 的方法),初始化其他相应必要信息,然后调用 invoke。无论是同步还是异步调用,最后都是会调用 invoke 方法。

在 invoke 方法中会根据同步、异步还是协程进行不同的处理。对于异步调用,还会根据是否开启协程来进行不同的处理,这里分析的是不开启协程的情况下的异步调用逻辑。在 invoke 方法中把请求都会添加到 epoll 的监听事件中,设置相应的 FDinfo。代码如下:

pObjProxy->getCommunicatorEpoll()->notify(pSptd->_iReqQNo, pReqQ); //这里的 pReqQ 就是包含了上一个方法中初始化的 ReqMessage

void CommunicatorEpoll::notify(size_t iSeq,ReqInfoQueue * msgQueue)
{  
		//因为初次发起该请求,所以 iSeq 位置肯定是不合法,进入 else 生成一个新的元素
		if(_notify[iSeq].bValid)
		{
				_ep.mod(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN);
				assert(_notify[iSeq].stFDInfo.p == (void*)msgQueue);
		}
		else
		{
				 _notify[iSeq].stFDInfo.iType = FDInfo::ET_C_NOTIFY;
				_notify[iSeq].stFDInfo.p=(void*)msgQueue;
				_notify[iSeq].stFDInfo.fd = _notify[iSeq].eventFd;
				_notify[iSeq].stFDInfo.iSeq = iSeq;
				_notify[iSeq].notify.createSocket();
				_notify[iSeq].bValid=true;
				_ep.add(_notify[iSeq].notify.getfd(),(long long)&_notify[iSeq].stFDInfo, EPOLLIN);
		}
}

加入 epoll 监听事件之后,接下来的逻辑就出现了同步异步的区别:对于同步调用会有一个 ReqMonitor 一直等待直到调用返回;而对于异步调用则会在添加监听之后直接返回。无论是同步还是异步调用都是在 epoll 轮询,发现事件 ready 调用方法处理。下面就是删除了不必要代码之后轮询执行逻辑,如下:

void CommunicatorEpoll::run()
{
		TLOGDEBUG("CommunicatorEpoll::run id:"<<syscall(SYS_gettid)<<endl);
	ServantProxyThreadData * pSptd = ServantProxyThreadData::getData();
		pSptd->_netThreadSeq = (int)_netThreadSeq;

		while (!_terminate)
		{
				//考虑到检测超时等的情况 这里默认就 wait100ms 吧
						int num = _ep.wait(_iWaitTimeout);
						for (int i = 0; i < num;   i)
						{
								const epoll_event& ev = _ep.get(i);
								uint64_t data = ev.data.u64;
								if(data == 0) continue; //data 非指针, 退出循环
								handle((FDInfo*)data, ev.events);
						}
						doTimeout();  
						doStat();
		}
}  

void CommunicatorEpoll::handle(FDInfo * pFDInfo, uint32_t events)
{
				if(FDInfo::ET_C_NOTIFY == pFDInfo->iType)
				{
						ReqInfoQueue * pInfoQueue=(ReqInfoQueue*)pFDInfo->p;
						ReqMessage * msg = NULL;
						 while(pInfoQueue->pop_front(msg))
						 {
										msg->pObjectProxy->invoke(msg);
						 }
				}
				else
				{  
					 //handle events:EPOLLIN EPOLLOUT EPOLLRDHUP EPOLLERR   
					 Transceiver *pTransceiver = (Transceiver*)pFDInfo->p;  
					 if (events & EPOLLIN)  
					 {  
								handleInputImp(pTransceiver);  
						}
				}   
}  

事件 ready 之后的调用 handle 方法。在 handle 方法中会根据具体的事件类型执行不同的逻辑。根据上面 invoke 方法中添加事件代码,当发起异步调用的时候设定的 FDInfo 类型,进入 ET_C_NOTIFY 分支。调用逻辑是通过 pObjectProxy->invoke 中调用 pAdapterProxy->invoke,最后的逻辑执行是在 AdapterProxy 中。

在 AdapterProxy 中 invoke,首先检查当前队列大小,如果大于最大值,则调用 finishInvoke 方法后直接返回;否则把数据交给网络传输_pTrans 发送数据。无论是否发送成功,都会把请求加入_pTimeoutQueue 队列等待处理。此时你调用的接口刚把数据发送出去,然后 epoll 继续在后台监听。

    
int AdapterProxy::invoke(ReqMessage * msg)
{
		 if(_pTimeoutQueue->getSendListSize() >= _sendQueueLimit)
		{
	msg->eStatus = ReqMessage::REQ_EXC;
				finishInvoke(msg);
				return 0;
		}
		//生成 requestid taf 调用 而且 不是单向调用
		if(!msg->bFromRpc)
		{
				msg->request.iRequestId = _pObjectProxy->generateId();
		}
		_pObjectProxy->getProxyProtocol().requestFunc(msg->request,msg->sReqData);

		//交给连接发送数据 发送数据成功
		if(_pTimeoutQueue->sendListEmpty()
				&& _pTrans->sendRequest(msg->sReqData.c_str(),msg->sReqData.size()) != Transceiver::eRetError)
		{
				if(msg->eType == ReqMessage::ONE_WAY)
				{
						delete msg;
						return 0;
				}

				bool bFlag = _pTimeoutQueue->push(msg, msg->request.iRequestId, msg->request.iTimeout   msg->iBeginTime);
	if(!bFlag)
	{
		msg->eStatus = ReqMessage::REQ_EXC;
		finishInvoke(msg);
	}
		}
		else
		{
				bool bFlag = _pTimeoutQueue->push(msg,msg->request.iRequestId, msg->request.iTimeout msg->iBeginTime, false);
	if(!bFlag)
	{  
								msg->eStatus = ReqMessage::REQ_EXC;
					finishInvoke(msg);
	}
		}
		return 0;
}

CommunicatorEpol 的 run 方法在上面已经有简单说明,定期检查监听的套接字,事件 ready 之后,调用 handle 方法处理事件,判断事件发生的类型。上面数据在_pTrans 发送成功之后,调用的接口处理完成会发送回包,然后 epoll 监听事件 ready 继续调用 handle 方法,这次发现是数据进来的事件调用 handleInputImp 接受数据。

   
void CommunicatorEpoll::handleInputImp(Transceiver * pTransceiver)
{
		if(pTransceiver->isConnecting())
		{
				int iVal = 0;
				socklen_t iLen = static_cast<socklen_t>(sizeof(int));
				if (::getsockopt(pTransceiver->fd(), SOL_SOCKET, SO_ERROR, reinterpret_cast<char*>(&iVal), &iLen) == -1 || iVal)
				{
						pTransceiver->close();
						pTransceiver->getAdapterProxy()->addConnExc(true);
						return;
				}
				pTransceiver->setConnected();
		}

		list<ResponsePacket> done;
		if(pTransceiver->doResponse(done) > 0)
		{
				list<ResponsePacket>::iterator it = done.begin();
				for (; it != done.end();   it)
				{
						pTransceiver->getAdapterProxy()->finishInvoke(*it);
				}
		}
}

无论是同步还是异步请求,请求处理最后都是回到了 AdapterProxy 的方法 finishInvoke。对于同步则是会唤醒之前的 ReqMonitor,通知处理结束;对于异步则是把请求处理结果塞回异步处理线程中的队列,异步处理线程一直监听自己任务队列,发现有任务则取出消息体进行处理。

需要注意的是在向异步线程任务队列 push 任务的时候是平均分配的,没有考虑每个异步处理线程的负载,也不考虑是那个业务线程 push 进来的(意思是同样一个业务线程接受到请求,可能一直是同一个异步处理线程处理,也可能多个异步线程同时处理来自同一个业务线程的请求)。我们遇到的多线程数据共享的问题就是来自于这里,看完源码才想通。

   
void AdapterProxy::finishInvoke(ReqMessage * msg)
{
		if(msg->eType == ReqMessage::ONE_WAY)
		{
				delete msg;
				return ;
		}
		//stat 上报调用统计  
		stat(msg);
		if(msg->eType == ReqMessage::SYNC_CALL)
		{
				//handle sync,wake monitor
				return ;
		}
		if(msg->eType == ReqMessage::ASYNC_CALL)
		{
	if(!msg->bCoroFlag)
	{
		if(msg->callback->getNetThreadProcess())
		{
			//如果是本线程的回调,直接本线程处理,比如获取 endpoint
			ReqMessagePtr msgPtr = msg;
			msg->callback->onDispatch(msgPtr);
		}
		else
		{
			//异步回调,放入回调处理线程中
			_pObjectProxy->getCommunicatorEpoll()->pushAsyncThreadQueue(msg);
		}
	}
	else
	{
		//开启协程
	}
		}
		return;
}

通过阅读异步处理线程可以发现,处理逻辑是调用消息体里的回调对象的 onDispatch 方法,而该回调对象则是你最初进行异步调用的时候传递进去的。对于 onDispatch 则会根据你定义的异步对象的不同执行逻辑也不同。

   
void AsyncProcThread::run()
{
		while (!_terminate)
		{
				ReqMessage * msg;
				//异步请求回来的响应包处理
				if(_msgQueue->empty())
				{
						TC_ThreadLock::Lock lock(*this);
						timedWait(1000);
				}

				if (_msgQueue->pop_front(msg))
				{
						//从回调对象把线程私有数据传递到回调线程中
						ServantProxyThreadData * pServantProxyThreadData = ServantProxyThreadData::getData();
						if(msg->adapter)
			{
								snprintf(pServantProxyThreadData->_szHost, sizeof(pServantProxyThreadData->_szHost), "%s", msg->adapter->endpoint().desc().c_str());
			}
			ReqMessagePtr msgPtr = msg;
			msg->callback->onDispatch(msgPtr);
				}
		}
}

这样整个异步处理逻辑就结束了。

如果你业务需要调用另外一个服务的时候,那么使用异步会很明显提升性能的。主线程接受到请求之后执行一些业务必要逻辑处理就调用异步处理直接返回了,比较耗时网络 io 和之后的处理都放在了异步处理线程,主线程可以解放出来继续接受接下来的链接。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏xingoo, 一个梦想做发明家的程序员

【设计模式】—— 单例模式Singleton

  模式意图   保证类仅有一个实例,并且可以供应用程序全局使用。为了保证这一点,就需要这个类自己创建自己的对象,并且对外有公开的调用方法。   模式结构 ? ...

1777
来自专栏偏前端工程师的驿站

不懂JQuery的孩子:自封装Ajax函数

前言                                       一直没痛下决心学习JQuery,但平时项目中又要用到Ajax,于是自己写一个...

32210
来自专栏开发与安全

从零开始学C++之对象的使用(三):static 与单例模式、auto_ptr与单例模式、const 用法小结、mutable修饰符

一、static 与单例模式 单例模式也就是简单的一种设计模式,它需要: 保证一个类只有一个实例,并提供一个全局访问点 禁止拷贝 #include <i...

1830
来自专栏小樱的经验随笔

【Java学习笔记之三十】详解Java单例(Singleton)模式

概念: Java中单例模式是一种常见的设计模式,单例模式的写法有好几种,这里主要介绍三种:懒汉式单例、饿汉式单例、登记式单例。   单例模式有以下特点: 1...

2615
来自专栏程序你好

如何构建创造性设计模式:单例模式

单例设计模式是一种软件设计模式,它将类的实例化限制为一个对象。与其他创造性设计模式(如抽象工厂)相比,单例构建器模式将创建一个对象,并且还将负责只存在该对象的一...

894
来自专栏SHERlocked93的前端小站

JS 单例模式

单例模式 (Singleton) 的实现在于保证一个特定类只有一个实例,第二次使用同一个类创建新对象的时候,应该得到与第一次创建对象完全相同的对象。 当创建一个...

683
来自专栏互联网研发闲思录

界面实时刷新线程信息

实时刷新线程信息,可以使用下面的方式 /** * 初始化jvm连接管理信息. */ $(document).ready(functi...

1788
来自专栏GreenLeaves

C# 静态构造函数

1、在类实例被初始化的时候执行 ? 2、在类的静态成员被调用的时候执行 ? 3、静态构造函数只会被执行一次,代码如下: static class Progr...

3304
来自专栏Python

Python中的单例模式的几种实现方式的及优化

单例模式 单例模式(Singleton Pattern)是一种常用的软件设计模式,该模式的主要目的是确保某一个类只有一个实例存在。当你希望在整个系统中,某个类只...

2346
来自专栏Java后端技术

你敢说自己了解单例模式?

  最近在学习设计模式,在看到单例模式的时候,我一开始以为直接很了解单例模式了,实现起来也很简单,但是实际上单例模式有着好几个变种,并且多线程中涉及到线程安全问...

782

扫码关注云+社区