前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Envoy:httpfilter相关代码阅读

Envoy:httpfilter相关代码阅读

作者头像
灰子学技术
发布2023-10-30 16:05:26
1890
发布2023-10-30 16:05:26
举报
文章被收录于专栏:灰子学技术灰子学技术

本篇文章是envoy httpfilter相关代码阅读的整理和总结,笔者试图通过这篇文章将http filter在envoy内部的管控讲清楚,并且将request和response是如何使用这部分 http filter功能的流程介绍清楚。

httpfilter是netfilter中的一种filter,因为envoy对http支持的细粒度管控很全面,所以将httpfilter又做了一层只是针对http协议的filter chain的管控处理逻辑。

httpfilter 在envoy中采用的是生产者和消费者的处理模式,通过配置文件或者xds协议的配置数据将http filter相关的信息,存放到固定的列表中,在有消息request和response到来的时候,通过异步事件触发对应的响应函数,进而从这些列表中取出对应的filter,依次执行filter的功能,达到使用http filter的目的。

一、生产者部分的逻辑:

在envoy初始化的时候,或者更新httpfilter配置的时候,通过httpconnectionManagerconfig依次将httpfilter存放到filter_factories中。

逻辑代码如下所示:

代码语言:javascript
复制
Network::FilterFactoryCb
HttpConnectionManagerFilterConfigFactory::createFilterFactoryFromProtoTyped()
---→
std::shared_ptr<HttpConnectionManagerConfig>  Utility::createConfig()
---→ 
std::make_shared<HttpConnectionManagerConfig>() { 
  ......
  // 操作的是http_filters
  const auto& filters = config.http_filters(); 
  DependencyManager dependency_manager;
  for (int32_t i = 0; i < filters.size(); i++) {
     processFilter(filters[i], i, "http", "http", i == filters.size() - 1, filter_factories_,
           dependency_manager);
  }
  ......
}
----→ 
void HttpConnectionManagerConfig::processFilter() {
  ......
  auto* factory =
  Config::Utility::getAndCheckFactory<Server::Configuration::NamedHttpFilterConfigFactory>(
    proto_config, proto_config.is_optional());
  ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig(
     proto_config, context_.messageValidationVisitor(), *factory);
  Http::FilterFactoryCb callback =
  factory->createFilterFactoryFromProto(*message, stats_prefix_, context_);
  ......
  // 这里将filterfactorycb存放到filter_factories中
  filter_factories.push_back(std::move(filter_config_provider));
}

二、消费者部分的逻辑

事件响应函数,在触发onFileEvent之后,有一个环节会调用createFilterChain()去消费filter_factories中的filterfactorycb函数,并通过这些已经注册好的cb函数,将http filter添加到decoder_filter 或者encoder_filter中。

代码语言:javascript
复制
ConnectionImpl::onFileEvent()-→
.......
------>
Envoy::StatusOr<ParserStatus> ServerConnectionImpl::onHeadersCompleteBase() {
......
  active_request.request_decoder_->decodeHeaders(std::move(headers), false);
......
}
----→ 
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers, bool end_stream) {
......
  const bool upgrade_rejected = filter_manager_.createFilterChain() == false;
......
}
---→ 
bool FilterManager::createFilterChain() {
......
  if (upgrade != nullptr) {
    const Router::RouteEntry::UpgradeMap* upgrade_map = filter_manager_callbacks_.upgradeMap();
    if (filter_chain_factory_.createUpgradeFilterChain(upgrade->value().getStringView(),
        upgrade_map, *this)) {
        filter_manager_callbacks_.upgradeFilterChainCreated();
        return true;
    } else {
        upgrade_rejected = true;
        // Fall through to the default filter chain. The function calling this
        // will send a local reply indicating that the upgrade failed.
   }
 }
filter_chain_factory_.createFilterChain(*this);
......
}
----→ 
HttpConnectionManagerConfig::createFilterChain()
HttpConnectionManagerConfig::createUpgradeFilterChain() 
----→ 
HttpConnectionManagerConfig::createFilterChainForFactories() {
 ......
   for (const auto& filter_config_provider : filter_factories) {
      auto config = filter_config_provider->config();
      if (config.has_value()) {
       // 这里对应的是http_filter 创建的工厂调用函数里面的FilterFactoryCb函数
          config.value()(callbacks);
          continue;
       }
......
}
-----→
以BandwidthLimitFilterConfig 为例子,config.value()(callbacks) 对应的是:
[filter_config](Http::FilterChainFactoryCallbacks& callbacks) -> void { // 这里的callbacks对应的是fiter_manager
      callbacks.addStreamFilter(std::make_shared<BandwidthLimiter>(filter_config))
} //callback对应的是fiter_manager,所以这里调用的是下面的函数:
----→ 
void addStreamFilter(StreamFilterSharedPtr filter) override {
    addStreamDecoderFilterWorker(filter, nullptr, true);
    addStreamEncoderFilterWorker(filter, nullptr, true);
    StreamDecoderFilter* decoder_filter = filter.get();
    filters_.push_back(decoder_filter);
}
---→
LinkedList::moveIntoListBack(std::move(wrapper), decoder_filters_); 或者
LinkedList::moveIntoList(std::move(wrapper), encoder_filters_);或者同时添加进去。

三、encoder_filter和decoder_filter的消费逻辑

这里的入口都是通过libevent里面的消息响应事件,关联到读和写的相应函数,最终从这两个列表里面依次取出对应的http filter进而执行相应的filter里面的功能,达到使用filter chain的目的。

消费encoder_filters_ 的流程:

代码语言:javascript
复制
ParserStatus ServerConnectionImpl::onMessageCompleteBase() 
ParserStatus ClientConnectionImpl::onMessageCompleteBase()
----→ 
void ConnectionManagerImpl::ActiveStream::encodeHeaders(ResponseHeaderMap& response_headers, bool end_stream) 
void encodeData(Buffer::Instance& data, bool end_stream) 
void encodeTrailers(ResponseTrailerMap& trailers)
----→ 
void FilterManager::encodeHeaders(ActiveStreamEncoderFilter* filter, ResponseHeaderMap& headers, bool end_stream)  
void encodeData(Buffer::Instance& data, bool end_stream)
void encodeTrailers(ResponseTrailerMapPtr&& trailers)
----→ 会去遍历encoder_filters去依次执行对应的
encodeHeaders()\encodeData()\encodeTrailers()函数

消费decoder_filters_的流程:

代码语言:javascript
复制
ParserStatus ServerConnectionImpl::onMessageCompleteBase() 
ParserStatus ClientConnectionImpl::onMessageCompleteBase()
----→  
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers,
      bool end_stream)
void decodeData(Buffer::Instance& data, bool end_stream)
void decodeTrailers(RequestTrailerMapPtr&& trailers)
---→ 
void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHeaderMap& headers,
bool end_stream) 
void FilterManager::decodeData(ActiveStreamDecoderFilter* filter, Buffer::Instance& data,
bool end_stream,
FilterIterationStartState filter_iteration_start_state)
void FilterManager::decodeTrailers(ActiveStreamDecoderFilter* filter, RequestTrailerMap& trailers)
--→ 会去遍历decoder_filters去依次执行对应的
decodeHeaders()\decodeData()\decodeTrailers()函数

补充代码信息:下面是onMessageCompleteBase函数在ServerConnectionImpl和ClientConnectionImpl中的详细代码信息

代码语言:javascript
复制
ParserStatus ServerConnectionImpl::onMessageCompleteBase() {
  ASSERT(!handling_upgrade_);
  if (active_request_.has_value()) {
    auto& active_request = active_request_.value();
    if (active_request.request_decoder_) {
      active_request.response_encoder_.readDisable(true);
    }
    active_request.remote_complete_ = true;
    if (deferred_end_stream_headers_) {
      active_request.request_decoder_->decodeHeaders(
          std::move(absl::get<RequestHeaderMapPtr>(headers_or_trailers_)), true);
      deferred_end_stream_headers_ = false;
    } else if (processing_trailers_) {
      active_request.request_decoder_->decodeTrailers(
          std::move(absl::get<RequestTrailerMapPtr>(headers_or_trailers_)));
    } else {
      Buffer::OwnedImpl buffer;
      active_request.request_decoder_->decodeData(buffer, true);
    }
    // Reset to ensure no information from one requests persists to the next.
    headers_or_trailers_.emplace<RequestHeaderMapPtr>(nullptr);
  }
  // Always pause the parser so that the calling code can process 1 request at a time and apply
  // back pressure. However this means that the calling code needs to detect if there is more data
  // in the buffer and dispatch it again.
  return parser_->pause();
}
  
ParserStatus ClientConnectionImpl::onMessageCompleteBase() {
  ENVOY_CONN_LOG(trace, "message complete", connection_);
  if (ignore_message_complete_for_1xx_) {
    ignore_message_complete_for_1xx_ = false;
    return ParserStatus::Success;
  }
  if (pending_response_.has_value()) {
    ASSERT(!pending_response_done_);
    // After calling decodeData() with end stream set to true, we should no longer be able to reset.
    PendingResponse& response = pending_response_.value();
    // Encoder is used as part of decode* calls later in this function so pending_response_ can not
    // be reset just yet. Preserve the state in pending_response_done_ instead.
    pending_response_done_ = true;
  
    if (deferred_end_stream_headers_) {
      response.decoder_->decodeHeaders(
          std::move(absl::get<ResponseHeaderMapPtr>(headers_or_trailers_)), true);
      deferred_end_stream_headers_ = false;
    } else if (processing_trailers_) {
      response.decoder_->decodeTrailers(
          std::move(absl::get<ResponseTrailerMapPtr>(headers_or_trailers_)));
    } else {
      Buffer::OwnedImpl buffer;
      response.decoder_->decodeData(buffer, true);
    }
  
    // Reset to ensure no information from one requests persists to the next.
    pending_response_.reset();
    headers_or_trailers_.emplace<ResponseHeaderMapPtr>(nullptr);
  }
  // Pause the parser after a response is complete. Any remaining data indicates an error.
  return parser_->pause();
}

参考文档:https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/http/http_filters#arch-overview-http-filters

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-04-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 灰子学技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档