Kafka S3源连接器在读取S3桶时抛出以下异常:
Caused by: java.io.IOException: Attempted read on closed stream.
at org.apache.http.conn.EofSensorInputStream.isReadAllowed(EofSensorInputStream.java:107)
at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:133)
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
该错误之前有以下警告:
WARN Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use. (com.amazonaws.services.s3.internal.S3AbortableInputStream:178)
我运行卡夫卡连接出这个图像:confluentinc/cp-kafka-connect-base:6.2.0
。使用confluentinc-kafka-connect-s3-source-2.1.1
jar。
我的源连接器配置如下:
{
"connector.class":"io.confluent.connect.s3.source.S3SourceConnector",
"tasks.max":"1",
"s3.region":"eu-central-1",
"s3.bucket.name":"test-bucket-yordan",
"topics.dir":"test-bucket/topics",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class":"io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility":"NONE",
"confluent.topic.bootstrap.servers": "blockchain-kafka-kafka-0.blockchain-kafka-kafka-headless.default.svc.cluster.local:9092",
"transforms":"AddPrefix",
"transforms.AddPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex":".*",
"transforms.AddPrefix.replacement":"$0_copy"
}
对这个问题有什么想法吗?我也找不到卡夫卡连接S3源连接器的存储库,它是开源的吗?
编辑:我不认为如果gzip压缩卡夫卡-连接接收器是禁用的问题。
发布于 2022-06-20 22:48:43
该警告意味着在读取文件之前调用了close()
。S3没有完成发送数据的工作,但是连接被挂起。
2个备选方案:
s3ObjectInputStream.abort()
(注意:如果您中止输入流,并且需要创建一个新的连接,则无法重用该连接,这将对性能产生影响)。在某些情况下,这可能是有意义的,例如当读取速度太慢时,等等。https://stackoverflow.com/questions/72685360
复制相似问题