专栏首页大数据进阶flink系列(5)-kafka源码分析

flink系列(5)-kafka源码分析

最近一直在弄flink sql相关的东西,第一阶段的目标是从解决kafka的消费和写入的问题。不过也有些同学并不是很了解,今天我们来详细分析一下包的继承层次。

flink源码如下:

public class KafkaTableSourceFactory implements StreamTableSourceFactory<Row>{

    private ConcurrentHashMap<String, KafkaTableSource> kafkaTableSources = new ConcurrentHashMap<>();

    @Override
    public Map<String, String> requiredContext() {
        Map<String, String> context = new HashMap<>();
        context.put(CONNECTOR_TYPE(), KafkaConnectorDescriptor.CONNECTOR_TYPE);
        context.put(CONNECTOR_PROPERTY_VERSION(),String.valueOf(KafkaConnectorDescriptor.CONNECTOR_PROPERTY_VERSION));
        return context;
    }

    @Override
    public List<String> supportedProperties() {
        List<String> properties = new ArrayList<>();
        properties.add(KafkaConnectorDescriptor.DATABASE_KEY);
        properties.add(KafkaConnectorDescriptor.TABLE_KEY);
        return properties;
    }

    @Override
    public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
        //避免频繁的触发 是否需要加缓存
        KafkaTableSource kafkaTableSource;
        String dataBase = properties.get(KafkaConnectorDescriptor.DATABASE_KEY);
        String table = properties.get(KafkaConnectorDescriptor.TABLE_KEY);
        if (!kafkaTableSources.containsKey(dataBase + table)) {
            Kafka08UDMPBTableSource.Builder builder = new Kafka08UDMPBTableSource.Builder();
            kafkaTableSource = builder
                    .cluster(dataBase)
                    .subject(table)
                    .build();
            kafkaTableSources.put(dataBase + table,kafkaTableSource);
        } else {
            kafkaTableSource = kafkaTableSources.get(dataBase + table);
        }
        return kafkaTableSource;
    }

}
class Kafka08PBTableSource protected(topic: String,
                                     properties: Properties,
                                     schema: TableSchema,
                                     typeInformation: TypeInformation[Row],
                                     paramMap: util.LinkedHashMap[String, AnyRef],
                                     entryClass: String)
  extends KafkaTableSource(schema, topic, properties, new PBRowDeserializationSchema(typeInformation, paramMap,entryClass)) {

  override def createKafkaConsumer(topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row]): FlinkKafkaConsumerBase[Row] = {
    this.setStartupMode(StartupMode.EARLIEST)
    new FlinkKafkaConsumer08(topic, deserializationSchema, properties).setStartFromEarliest()

  }
}

下面用户自定义的kafka的sink类:

class Kafka08UDMPBTableSink (topic: String,
                              properties: Properties,
                              partitioner: Optional[FlinkKafkaPartitioner[Row]],
                              paramMap: util.LinkedHashMap[String, AnyRef],
                              serializationSchema: SerializationSchema[Row],
                              fieldNames: Array[String],
                              fieldTypes: Array[TypeInformation[_]]
                            ) extends KafkaTableSink(topic, properties, partitioner.orElse(new FlinkFixedPartitioner[Row])) {

  override def createKafkaProducer(topic: String, properties: Properties, serializationSchema: SerializationSchema[Row], partitioner: Optional[FlinkKafkaPartitioner[Row]]): SinkFunction[Row]={
    new FlinkKafkaProducer08[Row](topic, serializationSchema, properties, partitioner.orElse(new FlinkFixedPartitioner[Row]))
  }

  override def createSerializationSchema(rowSchema: RowTypeInfo) = serializationSchema

  override def createCopy = new Kafka08UDMPBTableSink(topic, properties, this.partitioner, paramMap, serializationSchema, fieldNames, fieldTypes)

  override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): KafkaTableSink = {
    super.configure(this.fieldNames, this.fieldTypes)
  }

  override def getFieldNames: Array[String]=this.fieldNames

  /** Returns the types of the table fields. */
  override def getFieldTypes: Array[TypeInformation[_]]=this.fieldTypes


  override def emitDataStream(dataStream: DataStream[Row]): Unit = {
    val kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner)
    dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass, fieldNames))
  }

}
public class TrackRowDeserializationSchema implements SerializationSchema<Row>, DeserializationSchema<Row> {
    private static final long serialVersionUID = -2885556750743978636L;

    /** Type information describing the input type. */
    private TypeInformation<Row> typeInfo = null;

    private LinkedHashMap paraMap;

    private String inSchema;
    private String outSchema;
    private String inClass;
    private String outClass;
}
public class TrackRowFormatFactory extends TableFormatFactoryBase<Row>
        implements SerializationSchemaFactory<Row>, DeserializationSchemaFactory<Row> {

    public TrackRowFormatFactory() {
        super(TrackValidator.FORMAT_TYPE_VALUE, 1, false);
    }

    public TrackRowFormatFactory(String type, int version, boolean supportsSchemaDerivation) {
        super(type, version, supportsSchemaDerivation);
    }

    @Override
    protected List<String> supportedFormatProperties() {
        final List<String> properties = new ArrayList<>();
        properties.add(TrackValidator.FORMAT_IN_SCHEMA);
        properties.add(TrackValidator.FORMAT_IN_CLASS);
        properties.add(TrackValidator.FORMAT_OUT_CLASS);
        properties.add(TrackValidator.FORMAT_OUT_SCHEMA);
        properties.add(TrackValidator.FORMAT_TYPE_INFORMATION);
        properties.add(TrackValidator.FORMAT_TYPE_VALUE);
        return properties;
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • storm(3)-任务提交

    本地模式其实其实使用的是127.0.0.1,如果在storm集群上,借助storm jar则使用的是storm.yaml中的配置

    yiduwangkai
  • 【译】A Deep-Dive into Flink's Network Stack(2)

    对于下图所示的示例,我们将假设4的并行性和具有两个任务管理器的部署,每个任务管理器提供2个插槽。 TaskManager 1执行子任务A.1,A.2,B.1和B...

    yiduwangkai
  • scala(1):基础知识

    yiduwangkai
  • RxJava2 多线程

    subscribeOn这个操作符指定的是Observable自身在哪个调度器上执行,而且跟调用的位置没有关系。

    剑行者
  • 小程序(3):授权登录

    判断是否授权,如果没有,则显示授权按钮。注意上面的open-type="getUserInfo",这个会自动调起授权框。看一下js

    小尘哥
  • 【小家Spring】Spring中@PropertySource和@ImportResource的区别,以及各自的实现原理解析

    @PropertySource和@ImportResource或许很多人都用过,并且都还没有用错。但是若真把他俩拿过来一起的时候,却有点傻傻分不清楚了。

    YourBatman
  • 手把手教你实现JWT Token

    Json Web Token (JWT) 近几年是前后端分离常用的 Token 技术,是目前最流行的跨域身份验证解决方案。你可以通过文章 一文了解web无状态会...

    码农小胖哥
  • java模板引擎替换实现(附代码)

    挨踢小子部落阁
  • 异常的分类图解

        Error:走到半路上,发生山路塌陷,或者出现了泥石流,这个问题很严重,不是班长能够立马解决的。

    黑泽君
  • 探索 Java 隐藏的开销--私有方法调用莫瞎写

    随着 Android 引入 Java 8 的一些功能,请记住每一个标准库的 API 和语言特性都会带来一些相关的开销,这很重要。虽然设备越来越快而且内存越来越多...

    wust小吴

扫码关注云+社区

领取腾讯云代金券