发布
社区首页 >问答首页 >Kafka连接S3源代码抛出java.io.IOException

Kafka连接S3源代码抛出java.io.IOException
EN

Stack Overflow用户
提问于 2022-06-20 10:05:26
回答 1查看 80关注 0票数 0

Kafka S3源连接器在读取S3桶时抛出以下异常:

代码语言:javascript
代码运行次数:0
复制
Caused by: java.io.IOException: Attempted read on closed stream.
    at org.apache.http.conn.EofSensorInputStream.isReadAllowed(EofSensorInputStream.java:107)
    at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:133)
    at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
    at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)

该错误之前有以下警告:

代码语言:javascript
代码运行次数:0
复制
WARN Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use. (com.amazonaws.services.s3.internal.S3AbortableInputStream:178)

我运行卡夫卡连接出这个图像:confluentinc/cp-kafka-connect-base:6.2.0。使用confluentinc-kafka-connect-s3-source-2.1.1 jar。

我的源连接器配置如下:

代码语言:javascript
代码运行次数:0
复制
{
    "connector.class":"io.confluent.connect.s3.source.S3SourceConnector",
    "tasks.max":"1",
    "s3.region":"eu-central-1",
    "s3.bucket.name":"test-bucket-yordan",
    "topics.dir":"test-bucket/topics",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "partitioner.class":"io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.compatibility":"NONE",
    "confluent.topic.bootstrap.servers": "blockchain-kafka-kafka-0.blockchain-kafka-kafka-headless.default.svc.cluster.local:9092",
    "transforms":"AddPrefix",
   "transforms.AddPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.AddPrefix.regex":".*",
    "transforms.AddPrefix.replacement":"$0_copy"
}

对这个问题有什么想法吗?我也找不到卡夫卡连接S3源连接器的存储库,它是开源的吗?

编辑:我不认为如果gzip压缩卡夫卡-连接接收器是禁用的问题。

EN

回答 1

Stack Overflow用户

发布于 2022-06-20 22:48:43

该警告意味着在读取文件之前调用了close()。S3没有完成发送数据的工作,但是连接被挂起。

2个备选方案:

  1. 验证输入流没有包含更多的数据。通过这种方式,可以重用连接
  2. Call s3ObjectInputStream.abort() (注意:如果您中止输入流,并且需要创建一个新的连接,则无法重用该连接,这将对性能产生影响)。在某些情况下,这可能是有意义的,例如当读取速度太慢时,等等。
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72685360

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档