我希望实现一个自定义的s3分区器类,以包含一些avro消息字段和一些额外的逻辑来生成输出s3路径前缀。
这个项目在科特林,这是我的班级:
package co.kafkaProcessor.connect
import io.confluent.connect.storage.errors.PartitionException
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkRecord
import org.joda.time.DateTime
import org.joda.time.DateTimeZone
import org.slf4j.Logger
import org.slf4j.LoggerFactory
class MachineAwareHourlyPartitioner<T> : TimeBasedPartitioner<T>() {
private val log: Logger = LoggerFactory.getLogger(MachineAwareHourlyPartitioner::class.java)
private lateinit var environmentName: String
override fun configure(config: MutableMap<String, Any>?) {
super.configure(config)
environmentName = config!!["environment.prefix"] as String
}
private fun encodedPartitionForTimestamp(sinkRecord: SinkRecord, timestamp: Long?): String? {
// Our custom logic goes here
}
}
首先,我尝试创建一个自定义shadowJar任务来生成Jar文件:
tasks {
withType<ShadowJar> {
mergeServiceFiles()
append("META-INF/spring.handlers")
append("META-INF/spring.schemas")
append("META-INF/spring.tooling")
transform(PropertiesFileTransformer::class.java) {
paths = listOf("META-INF/spring.factories")
mergeStrategy = "append"
}
}
// Custom jars for kafka connect
create<ShadowJar>("kafkaConnectUtilsJar") {
archiveClassifier.set("connect-utils")
include("co/kafkaProcessor/connect/**")
include("co/kafkaProcessor/serializer/**")
from(project.sourceSets.main.get().output)
configurations = listOf(project.configurations.runtimeClasspath.get())
}
}
但是做jar -tvf filename.jar
表明它只包含了我自己的代码,卡夫卡连接java.lang.ClassNotFoundException: io.confluent.connect.storage.partitioner.TimeBasedPartitioner
失败了。我认为您不应该在自定义jar中包含kakfa连接代码,也是因为如果我尝试用TimeBasedPartitioner
配置任务,它就可以工作,这样类就可用了。
然后,通过将自定义jar定义更改为:
tasks {
withType<ShadowJar> {
mergeServiceFiles()
append("META-INF/spring.handlers")
append("META-INF/spring.schemas")
append("META-INF/spring.tooling")
transform(PropertiesFileTransformer::class.java) {
paths = listOf("META-INF/spring.factories")
mergeStrategy = "append"
}
}
// Custom jars for kafka connect
create<ShadowJar>("kafkaConnectUtilsJar") {
archiveClassifier.set("connect-utils")
dependencies {
include(dependency("io.confluent:kafka-connect-storage-partitioner:10.2.4"))
}
from(project.sourceSets.main.get().output)
configurations = listOf(project.configurations.runtimeClasspath.get())
}
}
不幸的是,这包括了我所有的应用程序代码,但是我可以看到jar文件中包含了这个分区程序。
Kafka connect现在失败了,出现了以下错误:
java.lang.ClassCastException: class co.kafkaProcessor.connect.MachineAwareHourlyPartitioner cannot be cast to class io.confluent.connect.storage.partitioner.Partitioner (co.kafkaProcessor.connect.MachineAwareHourlyPartitioner is in unnamed module of loader 'app'; io.confluent.connect.storage.partitioner.Partitioner is in unnamed module of loader org.apache.kafka.connect.runtime.isolation.PluginClassLoader @63a6dffd)
at io.confluent.connect.s3.S3SinkTask.newPartitioner(S3SinkTask.java:196)
at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:117)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:312)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
更新:我还试图通过重写公共方法encodePartition
来改变函数的覆盖方式,但是没有改变。
我还尝试添加了这样的测试(希望能够尝试向Partitioner`转换):
val partitioner = MachineAwareHourlyPartitioner<String>()
val implementedPartitioner = partitioner as Partitioner<String>
这并没有失败
发布于 2022-01-31 01:01:22
通过将jar文件(没有任何包含的依赖项)添加到s3连接器目录中,我能够让分区程序工作:
/usr/share/confluent-hub-components/confluentinc-kafka-connect-s3/lib/
我不确定这是否与插件隔离有关,这种隔离避免了不同插件库之间的相互干扰,但在我最初的尝试中,我在主类路径/usr/share/java/kafka/
中使用了插件,我认为它可以供每个插件使用。
作为额外的细节,我们还使用另一个自定义类来覆盖在TopicNameStrategy
连接器文件夹中没有工作的avro s3,我不得不在/usr/share/java/kafka/
目录中复制jar来修复这个问题,不知道为什么其中一个在全局文件夹中工作,另一个不工作。
https://stackoverflow.com/questions/70919548
复制相似问题