首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何连接到Flink中的MinIO文件系统?

如何连接到Flink中的MinIO文件系统?
EN

Stack Overflow用户
提问于 2022-07-26 02:52:18
回答 1查看 237关注 0票数 1

我正在尝试构建以Flink和MinIO作为存储空间的数据管道,目前我可以将数据成功地保存到MinIO桶中,但是当我尝试创建一个表WITH ( minio文件)时,它总是会遇到Connection Refused错误:

代码语言:javascript
复制
Flink SQL> CREATE TABLE WordCountTable (
>   word STRING,
>   `count` INT
> )  WITH (
>   'connector' = 'filesystem',         
>   'path' = 's3://test/wordcount2', 
>   'format' = 'csv',     
>   'csv.field-delimiter'=' '
> );
[INFO] Execute statement succeed.

Flink SQL> select * from WordCountTable;
[ERROR] Could not execute SQL statement. Reason:
java.net.ConnectException: Connection refused

我试图谷歌它,唯一有用的帖子是- https://github.com/fhueske/flink-sql-demo,在米诺部分,但它已经过时了。

这里是码头撰写文件:

代码语言:javascript
复制
version: '3'
services:
  minio:
    image: minio/minio
    ports:
      - "9000:9000"
      - "9001:9001"
    volumes:
      - minio_storage:/data
    environment:
      MINIO_ROOT_USER: minio
      MINIO_ROOT_PASSWORD: minio123
    command: server --console-address ":9001" /data

  jobmanager:
    image: flink:1.15.0-scala_2.12
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager        
        state.backend: filesystem
        state.checkpoints.dir: s3://state/checkpoint
        s3.endpoint: http://minio:9000
        s3.path.style.access: true
        s3.access-key: minio
        s3.secret-key: minio123

  taskmanager:
    image: flink:1.15.0-scala_2.12
    links:
      - jobmanager
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2        
        state.backend: filesystem
        state.checkpoints.dir: s3://state/checkpoint
        s3.endpoint: http://minio:9000
        s3.path.style.access: true
        s3.access-key: minio
        s3.secret-key: minio123

  sql-client:
    image: flink:1.15.0-scala_2.12
    command: bin/sql-client.sh
    links:
      - jobmanager
    depends_on:
      - jobmanager
    environment:
      FLINK_JOBMANAGER_HOST: jobmanager

volumes:
  minio_storage: { }

提前谢谢。

UPDATE今天我试着用pingnc验证网络连接:似乎每个人都认为是可以的:

代码语言:javascript
复制
root@0e452dd7385e:/usr/bin# ping jobmanager
PING jobmanager (192.168.128.3) 56(84) bytes of data.
64 bytes from flink-iceberg-minio-jobmanager-1.flink-iceberg-minio_default (192.168.128.3): icmp_seq=1 ttl=64 time=3.39 ms
64 bytes from flink-iceberg-minio-jobmanager-1.flink-iceberg-minio_default (192.168.128.3): icmp_seq=2 ttl=64 time=0.193 ms
64 bytes from flink-iceberg-minio-jobmanager-1.flink-iceberg-minio_default (192.168.128.3): icmp_seq=3 ttl=64 time=0.339 ms
64 bytes from flink-iceberg-minio-jobmanager-1.flink-iceberg-minio_default (192.168.128.3): icmp_seq=4 ttl=64 time=0.186 ms
代码语言:javascript
复制
root@0e452dd7385e:/usr/bin# nc -zv jobmanager 6123
Connection to jobmanager (192.168.128.3) 6123 port [tcp/*] succeeded!

但是我在sql日志中发现了一个Connection refused: /0.0.0.0:8081错误:

代码语言:javascript
复制
2022-07-28 06:44:16,870 WARN  org.apache.flink.client.program.rest.RestClusterClient       [] - Attempt to submit job 'collect' (80b7f32d13c2e3f1deeee4db3df6b923) to 'http://0.0.0.0:8081' has failed.
java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /0.0.0.0:8081
  at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
  at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
  at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?]
  at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
  at java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) ~[?:?]
  at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:476) ~[flink-dist-1.15.0.jar:1.15.0]
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) [flink-dist-1.15.0.jar:1.15.0]
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) [flink-dist-1.15.0.jar:1.15.0]
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) [flink-dist-1.15.0.jar:1.15.0]
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) [flink-dist-1.15.0.jar:1.15.0]
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) [flink-dist-1.15.0.jar:1.15.0]
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609) [flink-dist-1.15.0.jar:1.15.0]
  at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) [flink-dist-1.15.0.jar:1.15.0]
  at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321) [flink-dist-1.15.0.jar:1.15.0]

令人困惑的是,为什么 Flink sql客户机试图连接到0.0.0.0:8081?为什么不连接 jobmanager:8081

EN

回答 1

Stack Overflow用户

发布于 2022-09-20 14:53:17

您记得添加插件(Hadoop/Presto S3文件系统插件)吗?

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73117250

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档