目前我正在用Mysql处理Elastic Stack,像mysql数据库中的数据一样正常工作的一切都可以使用Logstash在Elastic-search上使用,但是当新数据输入到mysql数据库中时,我需要重新启动Logstash,或者可以使用Logstash的配置文件中的调度来完成
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/testdb"
# The user we wish to execute our statement as
jdbc_user => "root"
jdbc_password => "ankit"
# The path to our downloaded jdbc driver
jdbc_driver_library => "/home/ankit/Downloads/mysql-connector-java-5.1.38.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
#run logstash at an interval of on minute
#schedule => "* * * * * *"
# our query
statement => "SELECT * FROM ghijkl"
}
jdbc {
jdbc_connection_string => "jdbc:mysql://localhost:3306/testdb"
# The user we wish to execute our statement as
jdbc_user => "root"
jdbc_password => "ankit"
# The path to our downloaded jdbc driver
jdbc_driver_library => "/home/ankit/Downloads/mysql-connector-java-5.1.38.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
#run logstash at an interval of on minute
#schedule => "* * * * * *"
# our query
statement => "SELECT * FROM abcdef"
}
}
但是这不是一个好的方法,我正在考虑使用web-hooks,但是没有可用的资源,我尝试了文档页面上的Logstash HTTP输入插件,但是没有帮助。
请帮帮忙。
发布于 2018-05-28 19:42:29
您可以使用特殊查询仅下载最近的数据,比如说每15分钟下载一次:
SELECT * FROM ghijkl" WHERE EVENT_TIME_OCCURRENCE_FIELD > :sql_last_value
将插入最近记录的时间戳来代替:sql_last_value
。当查询第一次运行时,tracking_column
值被设置为01.01.1970
。
Logstash所需配置:
schedule => "*/15 * * * *"
use_column_value => true
tracking_column => 'EVENT_TIME_OCCURRENCE_FIELD'
对于每个输入,您还应该指定last_run_metadata_path
参数,以避免将来出现问题,当您有许多输入,其中一些使用相同的表但不同的模式时,元数据可能会被覆盖并产生意外的结果。
last_run_metadata_path => "PATH_TO_FILE_FOR_META_DATA"
https://stackoverflow.com/questions/50563315
复制相似问题