首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >pyflink TableException:未能执行sql

pyflink TableException:未能执行sql
EN

Stack Overflow用户
提问于 2022-01-28 03:43:47
回答 2查看 626关注 0票数 0

我使用pyflink运行flink流,如果我以StandAlone模式运行flink,它可以工作,但是运行flink时每个作业模式都是纱线,它失败了,报告"pyflink.util.exceptions.TableException: pyflink.util.exceptions.TableException失败“

纱线每件作业的命令是: flink run -t -pyfs -Djobmanager.memory.process.size=1024mb -Dtaskmanager.memory.process.size=2048mb -ynm flink-集群-Dtaskmanager.numberOfTaskSlots=2 -pyfs cluster.py .

独立命令是: flink运行-pyfs cluster.py ..。

附加在cluster.py中的python环境存档。

代码语言:javascript
运行
复制
env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)

curr_path = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
jars = f"""
file://{curr_path}/jars/flink-sql-connector-kafka_2.11-1.13.1.jar;
file://{curr_path}/jars/force-shading-1.13.1.jar"""
    
t_env.get_config().get_configuration().set_string("pipeline.jars", jars)
t_env.add_python_archive("%s/requirements/flink.zip" % curr_path)
t_env.get_config().set_python_executable("flink.zip/flink/bin/python")
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(2)
env.get_config().set_auto_watermark_interval(10000)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
parse_log = udaf(LogParser(parsing_params),
                 input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
                                  DataTypes.STRING(), DataTypes.TIMESTAMP(3)],
                     result_type=DataTypes.STRING(), func_type="pandas")
process_ad = udf(ADProcessor(ad_params), result_type=DataTypes.STRING())

t_env.create_temporary_function('log_parsing_process', parse_log)
t_env.create_temporary_function('ad_process', process_ad)

tumble_window = Tumble.over("5.minutes").on("time_ltz").alias("w")

t_env.execute_sql(f"""
            CREATE TABLE source_table(
                ip VARCHAR,               -- ip address
                raws VARCHAR,             -- message
                host VARCHAR,             -- host
                log_type VARCHAR,         -- type
                system_name VARCHAR,      -- system
                ts BIGINT,
                time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
                WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
            ) WITH (
                'connector' = 'kafka',
                'topic' = '{source_topic}',
                'properties.bootstrap.servers' = '{source_servers}',
                'properties.group.id' = '{group_id}',
                'scan.startup.mode' = '{auto_offset_reset}',
                'format' = 'json'
            )
            """)

sink_sql = f"""
        CREATE TABLE sink (
            alert VARCHAR,           -- alert
            start_time timestamp(3), -- window start timestamp
            end_time timestamp(3)    -- window end timestamp
        ) with (
            'connector' = 'kafka',
            'topic' = '{sink_topic}',
            'properties.bootstrap.servers' = '{sink_servers}',
            'json.fail-on-missing-field' = 'false',
            'json.ignore-parse-errors' = 'true',
            'format' = 'json'
        )"""

t_env.execute_sql(sink_sql)

t_env.get_config().set_null_check(False)

source_table = t_env.from_path('source_table')
sink_table = source_table.window(tumble_window) \
        .group_by("w, log_type") \
        .select("log_parsing_process(ip, raws, host, log_type, system_name, time_ltz) AS pattern, "
                "w.start AS start_time, "
                "w.end AS end_time") \
        .select("ad_process(pattern, start_time, end_time) AS alert, start_time, end_time")

sink_table.execute_insert("sink")

错误是:

代码语言:javascript
运行
复制
File "/tmp/pyflink/xxxx/xxxx/workerbee/log_exception_detection_run_on_diff_mode.py ,line 148, in run_flink sink_table_execute_insert("test_sink")
File "/opt/flink/flink-1.13.1_scala_2.12/opt/python/pyflink.zip/pyflink/table/table.py, line 1056 in execute_insert
File "/opt/flink/flink-1.13.1_scala_2.12/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
File "/opt/flink/flink-1.13.1_scala_2.12/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 163, in deco
pyflink.util.exceptions.TableException: Failed to execute sql
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:777)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:742)
    at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
    at sun.reflect.NativeMetondAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMetondAccessorImpl.invoke(NativeMethodAccessorImpl.hava:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.hava:498)
    at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker(MethodInvoker.java:244)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1

记录日志:

代码语言:javascript
运行
复制
INFO org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: launchContainer: [bash, /opt/hadoop_data/tmp/nm-local-dir/usercache/root/appcache/applicatino_I1644370510310_0002/container_I1644370510310_0002_03_000001/default_container_executor.sh]
WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_I1644370510310_0002_03_000001 is : 1
WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exception from container-launch with container ID: container_I1644370510310_0002_03_000001 and exit exit code: 1
ExitCodeException exitCode=1:
    at org.apache.hadoop.util.Shell.runCommand(Shell.java: 1008)
    at org.apache.hadoop.util.Shell.run(Shell.java: 901)
    at org.apache.hadoop.util.Shell$ShellCommandExceutor.execute(Shell.java:1213
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:309)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.launchContainer(ContainerLaunch.java:585)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.Call(ContainerLaunch.java:373)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.Call(ContainerLaunch.java:103)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPollExecutor.runWorker(ThreadPollExecutor.java:1149)
    at java.util.concurrent.ThreadPollExecutor$Worker.run(ThreadPollExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: Exception from container-launch.
INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: container id: container_I1644370510310_0002_03_000001
INFO org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor: Exit code: 1
WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch: Container launch failed : Container exited with a non-zero exit code 1
INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl: Container container_I1644370510310_0002_03_000001 transitioned from RUNNING to EXITED_WITH_FAILURE
EN

回答 2

Stack Overflow用户

发布于 2022-01-28 08:34:11

看起来像类加载器相关的问题。检查-泄漏-类加载器配置可以参考https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/config/

此外,您还可以尝试使用add_jar api而不是直接设置pipeline.jars配置。

代码语言:javascript
运行
复制
def add_jars(self, *jars_path: str):
        """
        Adds a list of jar files that will be uploaded to the cluster and referenced by the job.
    
        :param jars_path: Path of jars.
        """
        add_jars_to_context_class_loader(jars_path)
        jvm = get_gateway().jvm
        jars_key = jvm.org.apache.flink.configuration.PipelineOptions.JARS.key()
        env_config = jvm.org.apache.flink.python.util.PythonConfigUtil \
            .getEnvironmentConfig(self._j_stream_execution_environment)
        old_jar_paths = env_config.getString(jars_key, None)
        joined_jars_path = ';'.join(jars_path)
        if old_jar_paths and old_jar_paths.strip():
            joined_jars_path = ';'.join([old_jar_paths, joined_jars_path])
        env_config.setString(jars_key, joined_jars_path)
票数 0
EN

Stack Overflow用户

发布于 2022-02-14 13:09:45

在调试和检查之后,我终于发现问题在于我漏掉了一些flink hadoop jar包:

代码语言:javascript
运行
复制
commons-cli-1.4.jar
flink-shaded-hadoop-3-uber-3.1.1.7.2.1.0-327-9.0.jar
hadoop-yarn-api-3.3.1.jar
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70888574

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档