我正在使用flink消费kafka并写入redis。
下面是我对redis的接收器函数:
.addSink(new RichSinkFunction<MobilePageEvent>() {
@Override
public void invoke(MobilePageEvent event, Context context) {
JEDIS_CLUSTER.zadd(..);
}
})
.name("redis sink");
虽然我可以从redis命令行中获取数据,但指标显示汇函数的输出为零:
如何增加此指标?
发布于 2019-03-01 04:40:42
numRecordsIn和numRecordsOut指标仅计算在Flink作业本身内流动的流记录,不包括与外部系统的通信。因此,换句话说,源不会报告任何记录传入,而汇点不会报告任何记录传出。
在我看来,你有几个选择:
添加计数器度量的模式如here所示。
在redis接收器的情况下,您可以在open()方法中初始化一个计数器,并在invoke()中递增它。但这似乎是没有意义的,因为这只是反映了numRecordsIn指标。如果您的redis接收器正在执行缓冲的批量写操作,那么等到数据实际发送到redis之后再递增度量可能更有意义--在这种情况下,您可能更愿意使用Meter而不是计数器。
https://stackoverflow.com/questions/54928734
复制相似问题