首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >为s3卡夫卡连接接收器实现自定义分区

为s3卡夫卡连接接收器实现自定义分区
EN

Stack Overflow用户
提问于 2022-01-30 22:38:43
回答 1查看 41关注 0票数 0

我希望实现一个自定义的s3分区器类,以包含一些avro消息字段和一些额外的逻辑来生成输出s3路径前缀。

这个项目在科特林,这是我的班级:

代码语言:javascript
运行
复制
package co.kafkaProcessor.connect

import io.confluent.connect.storage.errors.PartitionException
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkRecord
import org.joda.time.DateTime
import org.joda.time.DateTimeZone
import org.slf4j.Logger
import org.slf4j.LoggerFactory

class MachineAwareHourlyPartitioner<T> : TimeBasedPartitioner<T>() {
    private val log: Logger = LoggerFactory.getLogger(MachineAwareHourlyPartitioner::class.java)
    private lateinit var environmentName: String

    override fun configure(config: MutableMap<String, Any>?) {
        super.configure(config)
        environmentName = config!!["environment.prefix"] as String
    }

    private fun encodedPartitionForTimestamp(sinkRecord: SinkRecord, timestamp: Long?): String? {
        // Our custom logic goes here
    }
}

首先,我尝试创建一个自定义shadowJar任务来生成Jar文件:

代码语言:javascript
运行
复制
tasks {
    withType<ShadowJar> {
        mergeServiceFiles()
        append("META-INF/spring.handlers")
        append("META-INF/spring.schemas")
        append("META-INF/spring.tooling")
        transform(PropertiesFileTransformer::class.java) {
            paths = listOf("META-INF/spring.factories")
            mergeStrategy = "append"
        }
    }

    // Custom jars for kafka connect
    create<ShadowJar>("kafkaConnectUtilsJar") {
        archiveClassifier.set("connect-utils")
        include("co/kafkaProcessor/connect/**")
        include("co/kafkaProcessor/serializer/**")
        from(project.sourceSets.main.get().output)
        configurations = listOf(project.configurations.runtimeClasspath.get())
    }
}

但是做jar -tvf filename.jar表明它只包含了我自己的代码,卡夫卡连接java.lang.ClassNotFoundException: io.confluent.connect.storage.partitioner.TimeBasedPartitioner失败了。我认为您不应该在自定义jar中包含kakfa连接代码,也是因为如果我尝试用TimeBasedPartitioner配置任务,它就可以工作,这样类就可用了。

然后,通过将自定义jar定义更改为:

代码语言:javascript
运行
复制
tasks {
    withType<ShadowJar> {
        mergeServiceFiles()
        append("META-INF/spring.handlers")
        append("META-INF/spring.schemas")
        append("META-INF/spring.tooling")
        transform(PropertiesFileTransformer::class.java) {
            paths = listOf("META-INF/spring.factories")
            mergeStrategy = "append"
        }
    }

    // Custom jars for kafka connect
    create<ShadowJar>("kafkaConnectUtilsJar") {
        archiveClassifier.set("connect-utils")
        dependencies {
            include(dependency("io.confluent:kafka-connect-storage-partitioner:10.2.4"))
        }
        from(project.sourceSets.main.get().output)
        configurations = listOf(project.configurations.runtimeClasspath.get())
    }
}

不幸的是,这包括了我所有的应用程序代码,但是我可以看到jar文件中包含了这个分区程序。

Kafka connect现在失败了,出现了以下错误:

代码语言:javascript
运行
复制
java.lang.ClassCastException: class co.kafkaProcessor.connect.MachineAwareHourlyPartitioner cannot be cast to class io.confluent.connect.storage.partitioner.Partitioner (co.kafkaProcessor.connect.MachineAwareHourlyPartitioner is in unnamed module of loader 'app'; io.confluent.connect.storage.partitioner.Partitioner is in unnamed module of loader org.apache.kafka.connect.runtime.isolation.PluginClassLoader @63a6dffd)
        at io.confluent.connect.s3.S3SinkTask.newPartitioner(S3SinkTask.java:196)
        at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:117)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:312)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

更新:我还试图通过重写公共方法encodePartition来改变函数的覆盖方式,但是没有改变。

我还尝试添加了这样的测试(希望能够尝试向Partitioner`转换):

代码语言:javascript
运行
复制
val partitioner = MachineAwareHourlyPartitioner<String>()
val implementedPartitioner = partitioner as Partitioner<String>

这并没有失败

EN

Stack Overflow用户

回答已采纳

发布于 2022-01-31 01:01:22

通过将jar文件(没有任何包含的依赖项)添加到s3连接器目录中,我能够让分区程序工作:

代码语言:javascript
运行
复制
/usr/share/confluent-hub-components/confluentinc-kafka-connect-s3/lib/

我不确定这是否与插件隔离有关,这种隔离避免了不同插件库之间的相互干扰,但在我最初的尝试中,我在主类路径/usr/share/java/kafka/中使用了插件,我认为它可以供每个插件使用。

作为额外的细节,我们还使用另一个自定义类来覆盖在TopicNameStrategy连接器文件夹中没有工作的avro s3,我不得不在/usr/share/java/kafka/目录中复制jar来修复这个问题,不知道为什么其中一个在全局文件夹中工作,另一个不工作。

票数 0
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70919548

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档