我正在尝试用表API和elasticsearch作为接收器创建一个pyflink应用程序。
from pyflink.table import TableEnvironment, EnvironmentSettings
def log_processing():
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///path_to/flink-sql-connector-kafka_2.12-1.13.1.jar;file:///path_to/flink-sql-connector-elasticsearch7_2.11-1.13.1")
sink_ddl = """
CREATE TABLE myUserTable (
user_id STRING,
user_name STRING,
uv BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'users'
)
"""
t_env.execute_sql(sink_ddl)
print(sink_ddl)
sink_table = t_env.sql_query("SELECT * FROM myUserTable")
if __name__ == '__main__':
log_processing()当我试图运行上述代码时,显示以下错误:
org.apache.flink.table.api.ValidationException:找不到在类路径中实现'org.apache.flink.table.factories.DynamicTableFactory‘的标识符'elasticsearch-7’的工厂。
可用的工厂标识符是:
卡夫卡
在org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319) at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463)\
如何解决这个问题。
发布于 2022-04-27 02:02:40
您能否再次检查路径file:///path_to/flink-sql-connector-elasticsearch7_2.11-1.13.1是否真的存在。它似乎缺少了.jar后缀。
https://stackoverflow.com/questions/71676047
复制相似问题