首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >具有动态数据流的FluentD elasticsearch插件@type elasticsearch_data_stream

具有动态数据流的FluentD elasticsearch插件@type elasticsearch_data_stream
EN

Stack Overflow用户
提问于 2022-07-27 17:09:47
回答 1查看 365关注 0票数 1

更新-1:通过在match中定义一个具体的data_stream_name,我在这方面取得了一些进展。剩下的唯一的事情就是想出一种方法来做动态数据流。我正在更新下面的代码示例并标记我添加的内容。

我有一个EFK堆栈,我想使用datastreams来进行滚动索引。当我在输出插件中使用下面的配置时,我会得到一个错误的The client is unable to verify that the server is Elasticsearch. Some functionality may not be compatible if the server is running an unsupported product. 2022-07-27 17:02:40 +0000 [warn]: #0 failed to flush the buffer. retry_times=90 next_retry_time=2022-07-27 17:03:12 +0000 chunk="5e4cbcab11eb8e7fd1b93d4aa706fb67" error_class=Fluent::ConfigError error="Failed to create data stream: <logs-abc-def-2022.07.27> Connection refused - connect(2) for 127.0.0.1:9200 (Errno::ECONNREFUSED)" 2022-07-27 17:02:40 +0000 [warn]: #0 suppressed same stacktrace

更新-1:上述错误已解决,我也正在更改题目的标题,以反映新的要求。是否有一种方法可以通过命名空间来执行动态数据流?如果我使用data_stream_name logs-${$.kubernetes.namespace_name},我会得到上面的错误,但是如果我使用一个具体的名称,它会工作logs-all-namespaces

代码语言:javascript
运行
复制
<match <pattern1> <pattern2> >
    @type elasticsearch_data_stream
    @log_level info
    prefer_oj_serializer true
    log_es_400_reason true
    include_tag_key true
    tag_key tag_fluentd
    hosts "#{ENV['ELASTICSEARCH_HOSTS']}"
    user "#{ENV['ELASTICSEARCH_USERNAME']}"
    password "#{ENV['ELASTICSEARCH_PASSWORD']}"
    scheme "https"
    ssl_version "TLSv1_2"
    ssl_verify false
    reload_connections false
    reconnect_on_error true
    reload_on_failure true
    request_timeout 15s
    logstash_format false
    # logstash_prefix logs-${$.kubernetes.namespace_name}
    time_key time_docker_log
    include_timestamp true
    suppress_type_name true
    template_name "hot-warm-cold-delete-30d"
    ilm_policy_id "hot-warm-cold-delete-30d"
    data_stream_name logs-all-namespaces # update 1: changed from logs-${$.kubernetes.namespace_name} to logs-all-namespaces
    enable_ilm true
    # ilm_policy_overwrite false
    # template_overrite true
    # template_pattern logs-${$.kubernetes.namespace_name}-*
    # index_name logs-${$.kubernetes.namespace_name}

    <buffer time, tag, $.kubernetes.namespace_name>
        @type file
        timekey 10
        path /data/fluentd-buffers/kubernetes.system.buffer.es
        ## Retrying control
        retry_type exponential_backoff # Specifies how to wait for the next retry to flush buffer. Default
        retry_forever true # Plugin will ignore retry_timeout and retry_max_times options and retry flushing forever.
        retry_max_interval 30 # The maximum interval (seconds) for exponential backoff between retries while failing.
        total_limit_size 512M # The size limitation of this buffer plugin instance. Default 512M
        ## buffering params
        chunk_limit_size 64M # The max size of each chunks: events will be written into chunks until
                              #the size of chunks become this size. Default 8MB
        chunk_limit_records 5000 # The max number of events that each chunks can store in it
        chunk_full_threshold 0.85 # The percentage of chunk size threshold for flushing
                                  # output plugin will flush the chunk when actual size reaches
        # Total size of the buffer (8MiB/chunk * 32 chunk) = 256Mi
        # queue_limit_length 32
        ## flushing params
        flush_thread_count 8 # The number of threads to flush the buffer. Default 1
        flush_interval 5s # The interval between buffer chunk flushes. Default 60
        flush_mode interval # Flushes per flush interval 
        overflow_action block # This mode stops input plugin thread until buffer full issue is resolved
    </buffer>
</match>

# Send pattern3 logs to rabbitmq
<match <pattern3>>
    @type rabbitmq
    host "#{ENV['RABBITMQ_HOST']}"
    user "#{ENV['RABBITMQ_WRITER_USERNAME']}"
    pass "#{ENV['RABBITMQ_WRITER_PASSWORD']}"
    vhost /
    format json
    exchange raw
    exchange_type direct
    exchange_durable true
    routing_key raw
    timestamp true
    heartbeat 10
    <buffer time, tag, $.kubernetes.namespace_name>
        @type file
        timekey 10
        path /data/fluentd-buffers/kubernetes.system.buffer.rabbitmq
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 4
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 16M
        total_limit_size 512M
        chunk_full_threshold 0.85
        overflow_action block
    </buffer>
</match>

gem列表输出(仅包括相关库)

代码语言:javascript
运行
复制
elastic-transport (8.0.0)
elasticsearch (8.2.0)
elasticsearch-api (8.2.0)
fluent-plugin-elasticsearch (5.2.2)

索引模板(热-暖-冷-删除-30d)

代码语言:javascript
运行
复制
{
  "template": {
    "settings": {
      "index": {
        "lifecycle": {
          "name": "hot-warm-cold-delete-30d"
        },
        "routing": {
          "allocation": {
            "include": {
              "_tier_preference": "data_hot"
            }
          }
        }
      }
    },
    "aliases": {},
    "mappings": {}
  }
}

ilm

代码语言:javascript
运行
复制
{
  "policy": {
    "phases": {
      "hot": {
        "min_age": "0ms",
        "actions": {
          "rollover": {
            "max_size": "5gb",
            "max_age": "10m"
          },
          "set_priority": {
            "priority": 100
          }
        }
      },
      "warm": {
        "min_age": "10m",
        "actions": {
          "set_priority": {
            "priority": 50
          }
        }
      },
      "cold": {
        "min_age": "2d",
        "actions": {
          "set_priority": {
            "priority": 0
          }
        }
      },
      "delete": {
        "min_age": "365d",
        "actions": {
          "delete": {
            "delete_searchable_snapshot": true
          }
        }
      }
    }
  }
}

如果需要更多的信息,请告诉我。我已经做了一个星期了。

EN

回答 1

Stack Overflow用户

发布于 2022-08-03 13:11:51

关于动态占位符,您是否尝试过在这个问题中提到的解决方案,并为fluentd插件使用了datastream名称?

对于您的用例,这基本上是:

代码语言:javascript
运行
复制
<filter **>
  @type record_transformer
  enable_ruby
  <record>
    kuber_namespace ${record["kubernetes"]["namespace_name"]}
  </record>
</filter>
<match <pattern1> <pattern2> >
  @type elasticsearch_data_stream
  data_stream_name logs-${kuber_namespace}
  ... 
  <buffer tag, kuber_namespace>
    .... 
  </buffer>
</match>
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73142044

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档