前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Envoy源码分析之ThreadLocal

Envoy源码分析之ThreadLocal

作者头像
黑光技术
修改2020-05-15 11:48:12
1.2K0
修改2020-05-15 11:48:12
举报
文章被收录于专栏:黑光技术黑光技术

完美之道,不在无可增加,而在无可删减!

ThreadLocal整体结构

Envoy中严重依赖ThreadLocal,为了避免加锁Envoy会尽可能在单一线程中完成所有的事件,但是多个线程之间难免会有一些数据需要共享,还有可能需要读写,为了避免加锁Envoy将一些需要在线程之间共享的数据放在ThreadLocal中,当ThreadLocal中的数据需要更新的时候则会通过主线程将更新后的数据Post到各个线程中,交由各个线程来更新自己的ThreadLocal。Envoy在C++11的thread_local的基础上结合Dispatcher实现了一个ThreadLocal对象。本文则会重点分析下ThreadLocal的设计与实现。先来看下ThreadLocal的整体结构,下文会逐一进行分析。

ThreadLocal实现

ThreadLocalObject是一个空的接口类,要求所有的ThreadLocal数据对象都要继承自这个空接口,比如下面这个ThreadLocal对象。

代码语言:javascript
复制
class ThreadLocalObject {
public:
  virtual ~ThreadLocalObject() {}
};
struct ThreadLocalCachedDate : public ThreadLocal::ThreadLocalObject {
   ThreadLocalCachedDate(const std::string& date_string) : 
   date_string_(date_string) {}
   const std::string date_string_;
};

所有的ThreadLocalObject对象会保存在ThreadLocalData中,这是一个使用C++11的thread_local关键字声明的变量,是真正的线程局部存储。这个对象包含了两个成员,其中一个是vector,保存了所有的ThreadLocalObject,另外一个保存的是Dispatcher,指向当前线程的Dispatcher对象。相关代码如下:

代码语言:javascript
复制
struct ThreadLocalData {
  Event::Dispatcher* dispatcher_{};
  std::vector<ThreadLocalObjectSharedPtr> data_;
};

当你要使用ThreadLocal对象的功能时,你需要一个SlotAllocator分配器,从这个分配器可以分配一个Slot,一个Slot包含了一个ThreadLocalObject,从这个Slot中就可以获取到保存在线程局部存储中的ThreadLocalObject对象。下面是Slot对象的结构。

代码语言:javascript
复制
class Slot {
public:
  virtual ~Slot() {}  
  virtual ThreadLocalObjectSharedPtr get() PURE;
  template <class T> T& getTyped() { 
  	return *std::dynamic_pointer_cast<T>(get()); 
  }
  virtual void runOnAllThreads(Event::PostCb cb) PURE;  
  virtual void runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_complete_cb) PURE;
  typedef std::function<ThreadLocalObjectSharedPtr(Event::Dispatcher& dispatcher)> InitializeCb;
  virtual void set(InitializeCb cb) PURE;
};

Slot是一个接口类,这个接口提供了几个关键功能,一个就是获取到对应的ThreadLocalObjec对象,另外一个就是在所有注册的线程中执行PostCb类型的回调方法。Slot对应的实现类是SlotImpl

代码语言:javascript
复制
struct SlotImpl : public Slot {
  SlotImpl(InstanceImpl& parent, uint64_t index) : 
		   parent_(parent), index_(index) {}
  ~SlotImpl() { parent_.removeSlot(*this); }
  .......
  InstanceImpl& parent_;  const uint64_t index_;
};

SlotImpl保存了对InstanceImpl的引用,还有一个索引值,这个值是SlotImpl对应的ThreadLocalObject对象在ThreadLocalData中的索引(上文中说到了,所有的ThreadLocalObject对象都存在ThreadLocalData中的一个vector成员中。)通过这个索引就可以快速找到该SlotImpl对应的ThreadLocalObject对象了。接下来再看下SlotAllocatorSlotImpl并不是直接构造来使用的,而是通过SlotAllocator分配的。

代码语言:javascript
复制
class SlotAllocator {
public:
  virtual ~SlotAllocator() {}
  virtual SlotPtr allocateSlot() PURE;
};

SlotAllocator是一个接口,只有一个方法就是allocateSlot,这个方法用于分配一个SlotInstance接口继承自SlotAllocator,对其进行了扩展,是整个ThreadLocal的基础接口,直接暴露给用户使用的。其接口如下。

代码语言:javascript
复制
class Instance : public SlotAllocator {
public:
  virtual void registerThread(Event::Dispatcher& dispatcher, bool main_thread) PURE;
  virtual void shutdownGlobalThreading() PURE;
  virtual void shutdownThread() PURE;
  virtual Event::Dispatcher& dispatcher() PURE;
};

所有要进行数据共享的线程都需要通过registerThread接口进行注册,dispatcher接口则是用来返回当前线程对应的Dispatcher对象。InstanceImpl实现了Instance接口。

代码语言:javascript
复制
class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
public:
  InstanceImpl() : main_thread_id_(std::this_thread::get_id()) {}  
  ~InstanceImpl();  
  // ThreadLocal::Instance
  ........

private:
  static void setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object);
  static thread_local ThreadLocalData thread_local_data_;
  std::vector<SlotImpl*> slots_;
  std::list<std::reference_wrapper<Event::Dispatcher>> registered_threads_;
  std::thread::id main_thread_id_;
  Event::Dispatcher* main_thread_dispatcher_{};
  std::atomic<bool> shutdown_{};
};

main_thread_dispatcher_用来保存主线程的Dispatcher对象,registered_threads_用来保存所有注册到ThreadLocal中的Dispatcher对象。slots_则保存了所有分配出去的Slot,每分配出一个Slot就会new一个SlotImpl对象,然后保存在slots_中,使用者通过分配的Slot,拿到其对应的索引值,然后通过setThreadLocal静态方法就可以把要共享的数据放到线程存储中了。

代码语言:javascript
复制
void InstanceImpl::setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object) {
  if (thread_local_data_.data_.size() <= index) {
    thread_local_data_.data_.resize(index + 1);
  }

  thread_local_data_.data_[index] = object;
}

线程注册的过程也很简单,就是把传递进来的Dispatcher对象放到registered_threads_中,需要注意的是这里用的是std::reference_wrapper<Event::Dispatcher>,保存的是Dispatcher的引用。

代码语言:javascript
复制
void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) {
  ASSERT(std::this_thread::get_id() == main_thread_id_);
  ASSERT(!shutdown_);
  if (main_thread) {
    main_thread_dispatcher_ = &dispatcher;
    thread_local_data_.dispatcher_ = &dispatcher;
  } else {
    ASSERT(!containsReference(registered_threads_, dispatcher));
    registered_threads_.push_back(dispatcher);
    dispatcher.post([&dispatcher] { 
  		thread_local_data_.dispatcher_ = &dispatcher; 
	});
  }
}

如果是主线程的话,还会额外设置下main_thread_dispatcher_,让其指向主线程的Dispatcher。将Dispatcher对象放到registered_threads_中后,需要更新对应线程的thread_local_data_局部存储中的dispatcher_指针,使其指向线程自己的Dispatcher对象。所以这里是通过Dispatcher的post方法来执行这个callback的,因为post保证callback会和Dispatcher对象所在线程中执行。线程注册完成后就可以通过allocateSlot接口来分配Slot了,这里对于Slot的分配其实是惰性的,只有在需要的时候才会分配。

代码语言:javascript
复制
SlotPtr InstanceImpl::allocateSlot() {
  ASSERT(std::this_thread::get_id() == main_thread_id_);  
  ASSERT(!shutdown_);  
  for (uint64_t i = 0; i < slots_.size(); i++) {
      if (slots_[i] == nullptr) {
      std::unique_ptr<SlotImpl> slot(new SlotImpl(*this, i));
      slots_[i] = slot.get();      
      return std::move(slot);
    }
  }

  std::unique_ptr<SlotImpl> slot(new SlotImpl(*this, slots_.size()));
  slots_.push_back(slot.get()); 
  return std::move(slot);
}

遍历所有的Slot,如果发现是Slot是空的就会进行分配,如果都没有找到就直接重新分配一个Slot,然后插入到slots_中,有了Slot后需要通过其set方法将要存储的ThreadLocalObject对象放到线程局部存储中。

代码语言:javascript
复制
typedef std::function<ThreadLocalObjectSharedPtr(Event::Dispatcher& dispatcher)> InitializeCb;
void InstanceImpl::SlotImpl::set(InitializeCb cb) {
  ASSERT(std::this_thread::get_id() == parent_.main_thread_id_);
  ASSERT(!parent_.shutdown_);
  
  for (Event::Dispatcher& dispatcher : parent_.registered_threads_) {
      const uint32_t index = index_;
    dispatcher.post([index, cb, &dispatcher]() -> void { 
  		setThreadLocal(index, cb(dispatcher)); 
	});
  }  
      
  // Handle main thread.
  setThreadLocal(index_, cb(*parent_.main_thread_dispatcher_));
}

首先通过InitializeCb拿到要存储的ThreadLocalObject,然后到所有线程中调用setThreadLocal方法来更新ThreadLocalObject对象到对应线程的局部存储中。这个方法只能在主线程中调用。调用完成后,所有的线程通过Slot就可以访问到存储的ThreadLocalObject对象了。除了存储数据外,SlotImpl还提供了二个用于在所有线程中执行任务的接口。

代码语言:javascript
复制
void InstanceImpl::runOnAllThreads(Event::PostCb cb) {
  ASSERT(std::this_thread::get_id() == main_thread_id_);  
  ASSERT(!shutdown_);  
  for (Event::Dispatcher& dispatcher : registered_threads_) {
    dispatcher.post(cb);
  }  
  cb();
}

void InstanceImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_complete_cb) {
  ASSERT(std::this_thread::get_id() == main_thread_id_);  
  ASSERT(!shutdown_);  cb();
  std::shared_ptr<std::atomic<uint64_t>> worker_count =
      std::make_shared<std::atomic<uint64_t>>(registered_threads_.size());
  for (Event::Dispatcher& dispatcher : registered_threads_) {
    dispatcher.post([this, worker_count, cb, 
			all_threads_complete_cb]() -> void {
        cb();
        if (--*worker_count == 0) {
           main_thread_dispatcher_->post(all_threads_complete_cb);
        }
     });
  }
}

因为ThreadLocal保存了所有注册进来的Dispatcher对象,通过Dispatcherpost方法就可以向对应线程投递任务来执行,runOnAllThreads的第二个重载实现可以在所有线程都执行完毕后,回调主线程的all_threads_complete_cb方法,实现方式也是比较简单易懂的,就是通过将一个std::shared_ptr 的原子计数器拷贝到要执行的任务中,任务执行完就递减计数器,等到计数器为0就回调all_threads_complete_cb。到此为止ThreadLocal的两大核心功能就分析完毕了,一个是通过set方法更新所有线程的局部存储,另外一个就是通过runOnAllThreads往所有的线程投递任务。 最后我们来分析下ThreadLocalshutdown过程,这个过程比较难理解,InstanceImpl提供了两个方法用于shutdown。

代码语言:javascript
复制
void InstanceImpl::shutdownGlobalThreading() {
  ASSERT(std::this_thread::get_id() == main_thread_id_);  
  ASSERT(!shutdown_);
  shutdown_ = true;
}

shutdownGlobalThreading方法只是设置了一个shutdown_flag,只能在主线程中调用,这个flag的作用只是用于在Slot析构的时候不通知所有线程将对应Slot从其线程存储中去除,正常情况下一个Slot析构需要更新所有线程的局部存储,从中去掉Slot对应的ThreadLocalObject对象。而在Shutdown的过程则不需要,因为主线程进行shutdown的时候表明其他线程已经shutdown了,其关联的Dispatcher对象已经不存活。所以这种情况下Slot析构什么也不做。

代码语言:javascript
复制
~SlotImpl() { parent_.removeSlot(*this); }
void InstanceImpl::removeSlot(SlotImpl& slot) {
  ASSERT(std::this_thread::get_id() == main_thread_id_);  
  if (shutdown_) {
      return;
  }  
  const uint64_t index = slot.index_;
  slots_[index] = nullptr;  runOnAllThreads([index]() -> void {
      if (index < thread_local_data_.data_.size()) {
      thread_local_data_.data_[index] = nullptr;
    }
  });
}

还有另外一个shutdown函数就是shutdownThread,这个函数会遍历所有的线程存储的数据,然后进行reset操作,最后把整个vector进行clear()。每一个worker都持有InstanceImpl实例的引用,在析构的时候会调用shutdownThread。这个函数的实现如下:

代码语言:javascript
复制
void InstanceImpl::shutdownThread() {
  ASSERT(shutdown_);  
  for (auto it = thread_local_data_.data_.rbegin(); 
	   it != thread_local_data_.data_.rend(); ++it) {
    it->reset();
  }
  thread_local_data_.data_.clear();
}

很奇怪的是这里是逆序来遍历所有的ThreadLocalObject对象来进行reset的,这是因为一些"持久"(活的比较长)的对象如cluster manager很早就会创建ThreadLocalObject对象,但是直到shutdown的时候也不析构,而在此基础上依赖cluster manager的对象的如grpc client等,则是后创建ThreadLocalObject对象,如果cluster manager创建的ThreadLocalObject对象先析构,而grpc client相关的ThreadLocalObject对象后析构就会导致shutdown问题。为此这里选择逆序来进行reset,先从一个高层的对象开始,最后才开始对一些基础的对象所关联的ThreadLocalObject进行reset。例如下面这个例子:

代码语言:javascript
复制
struct ThreadLocalPool : public ThreadLocal::ThreadLocalObject {
    ThreadLocalPool(InstanceImpl& parent, Event::Dispatcher& dispatcher,
                        const std::string& cluster_name);
    ~ThreadLocalPool();
    PoolRequest* makeRequest(const std::string& hash_key, 
				const RespValue& request,
                             PoolCallbacks& callbacks);
    void onHostsRemoved(const std::vector<Upstream::HostSharedPtr>& hosts_removed);

    InstanceImpl& parent_;
    Event::Dispatcher& dispatcher_;
    Upstream::ThreadLocalCluster* cluster_;
    std::unordered_map<Upstream::HostConstSharedPtr, 
		ThreadLocalActiveClientPtr> client_map_;
    Envoy::Common::CallbackHandle* local_host_set_member_update_cb_handle_;
};

redis_proxy中定义了一个ThreadLocalPool,这个ThreadLocalPool又依赖较为基础的ThreadLocalCluster(是ThreadLocalClusterManagerImpl的数据成员,也就是Cluster manager所对应的ThreadLocalObject对象),如果shutdownThread按照顺序的方式析构的话,那么ThreadLocalPool中使用的ThreadLocalCluster(其实是ThreadLocalClusterManagerImpl会先析构)会先被析构,然后才是ThreadLocalPool的析构,而ThreadLocalPool析构的时候又会使用到ThreadLocalCluster,但是ThreadLocalCluster已经析构了,这个时候就会出现野指针的问题了。

代码语言:javascript
复制
InstanceImpl::ThreadLocalPool::ThreadLocalPool(InstanceImpl& parent, 
						Event::Dispatcher& dispatcher, 
				                const std::string& cluster_name)
    : parent_(parent), dispatcher_(dispatcher), 
cluster_(parent_.cm_.get(cluster_name)) {
  .....
  local_host_set_member_update_cb_handle_ = 
  cluster_->prioritySet().addMemberUpdateCb(
      [this](uint32_t, const std::vector<Upstream::HostSharedPtr>&,
      const std::vector<Upstream::HostSharedPtr>& hosts_removed) -> void {
        onHostsRemoved(hosts_removed);
      });
}

InstanceImpl::ThreadLocalPool::~ThreadLocalPool() {
  // local_host_set_member_update_cb_handle_是ThreadLocalCluster的一部分
  // ThreadLocalCluster析构会导致local_host_set_member_update_cb_handle_变成野指针
  local_host_set_member_update_cb_handle_->remove();  
  while (!client_map_.empty()) {
    client_map_.begin()->second->redis_client_->close();
  }
}

总结

通过上文我相信我们应该足以驾驭Envoy中的ThreadLocal,从其设计可以看出有其巧妙之处,也有其不足的地方,比如其抽象出一个Slot和对应的线程存储进行了关联,Slot可以任意传递,因为不包含实际的数据,拷贝的开销很低,只包含了一个索引值,具体关联的线程存储数据是不知道的,避免直接暴露给用户背后的数据。而InstanceImpl对象则管理着所有Slot的分配和移除以及整个ThreadLocal对象的shutdown。不足的地方我觉得主要有两点, 其中一个就是Slot的分配机制效率不太高,如果Slot在大量进行了分配和释放后,整个vector中的空闲的Slot其实很稀疏,这个时候如果从头开始遍历来找下一个可用的Slot则效率不高,而且根据Slot分配的特点来看,越靠前的Slot越有可能没释放(越基础的对象,越早创建Slot,但是最后才释放),这样的话,每次遍历找空闲的Slot的时候,其实前N个很大概率都是在做无用功,影响查找的效率。另外一个我觉得不太好的地方就是shutdownThread的实现,这个其实比较难理解,对使用者也没有太多的约束,如果两个ThreadLocalObject产生了依赖(比如A依赖B),但是A是先分配Slot的,B是后分配的,那么这种情况下逆序进行析构的时候会先把B析构,等到析构A的时候,如果在其析构函数中又使用了B就会产生野指针的问题,而且这种情况也没有检查机制很难被发现。

看完本文有收获?请分享给更多人

关注「黑光技术」加星标,关注大数据+微服务

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-02-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 黑光技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ThreadLocal整体结构
  • ThreadLocal实现
  • 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档