我有下面的logstash输出插件,其中有一个条件,在启动管道时会产生来自logstash的错误。
卡夫卡输出过滤器支持条件词吗?
output {
kafka {
id => "plugin_SharedAlarmCreated"
bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
codec => json
topic_id => "Shared.Event.AlarmCreated"
if "null-value" in [tags] {
message_key => "%{Source}+%{Id}+NULL"
}
else {
message_key => "%{Source}+%{Id}+%{SourceId}"
}
}
}
2022-10-15T16:37:35,971 logstash.agent未能执行{:action=>LogStash::PipelineAction::Create/pipeline_id:shared-pipeline-create-alarm,:exception=>"LogStash::ConfigurationError",:message=>的操作。在输出{r\n id => "plugin_SharedAlarmCreated"\r\n bootstrap_servers => " kafka -1:9092,kafka-2:9092,kafka-2:9092,kafka-2:9092,卡夫卡-3:9092“\r\n编解码器=> json\r\n topic_id => "Shared.Event.AlarmCreated"\r\n如果", compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:189:in
initialize‘,"org/logstash/execution/JavaBasePipelineExt.java:72:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in
初始化’,"/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:50:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:376:in
块在converge_state'"}
发布于 2022-10-15 17:37:53
不,您不能在插件定义中使用条件。可以在输出部分中使用条件。
output {
if "null-value" in [tags] {
kafka {
id => "plugin_SharedAlarmCreated1"
bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
codec => json
topic_id => "Shared.Event.AlarmCreated"
message_key => "%{Source}+%{Id}+NULL"
}
} else {
kafka {
id => "plugin_SharedAlarmCreated2"
bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
codec => json
topic_id => "Shared.Event.AlarmCreated"
message_key => "%{Source}+%{Id}+%{SourceId}"
}
}
}
但是,如果您有多个输出,那么您将有多个到kafka的连接,所以最好在filter部分中使用条件。
filter {
if "null-value" in [tags] {
mutate { add_field => { [@metadata][message_key] => "%{Source}+%{Id}+NULL" } }
}
else {
mutate { add_field => { [@metadata][message_key] => "%{Source}+%{Id}+%{SourceId}" } }
}
}
output {
kafka {
id => "plugin_SharedAlarmCreated"
bootstrap_servers => "kafka-1:9092,kafka-2:9092,kafka-3:9092"
codec => json
topic_id => "Shared.Event.AlarmCreated"
message_key => "%{[@metadata][message_key]}"
}
}
发布于 2022-10-15 19:24:37
我已经尝试了你的第一个建议,但由于重复的plugin_id错误而失败了。我想我可以使用两个不同的plugin_id值,但我更喜欢你的第二个建议。我试过了但还是出了个错误。
我有红宝石过滤器,创建空值标签,。这是我的完整过滤部分。
filter {
ruby {
code => "if event.get('SourceId').nil?; event.set('tags','null-value');end"
}
if "null-value" in [tags] {
[@metadata][message_key] => "%{Source}+%{Id}+NULL"
}
else {
[@metadata][message_key] => "%{Source}+%{Id}+%{SourceId}"
}
}
2022-10-15T18:46:54,491 logstash.agent未能执行{:action=>LogStash::PipelineAction::Create/pipeline_id:shared-pipeline-create-alarm,:exception=>的操作“LogStash::ConfigurationError”,:message=>“”期望在第24行中出现\t\r\n、"#“、"if”、A-Za-z0-9_-、“‘’、‘’、"}”中的一个。在筛选器{\r\n ruby {\r\n代码=> "if event.get(‘SourceId’).nil?“之后,列5(字节560);Event.set(‘标签’,‘空值’);如果标签{\r\n中的“空值”为“空值”,则结束“\r\n}\r\n”,如果标记{\r\n中为“空值”,则为“:backtrace=>"/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:189:in
initialize'","org/logstash/execution/JavaBasePipelineExt.java:72:in initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:47:in
initialize‘’,"/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:50:in execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:376:in
块in converge_state'"} }
第24行是"if“吐露的结尾大括号。
https://stackoverflow.com/questions/74081380
复制相似问题