原创

16-Flink-Redis-Sink

戳更多文章:

1-Flink入门

2-本地环境搭建&构建第一个Flink应用

3-DataSet API

4-DataSteam API

5-集群部署

6-分布式缓存

7-重启策略

8-Flink中的窗口

9-Flink中的Time

简介

流式计算中,我们经常有一些场景是消费Kafka数据,进行处理,然后存储到其他的数据库或者缓存或者重新发送回其他的消息队列中。

本文讲述一个简单的Redis作为Sink的案例。

后续,我们会补充完善,比如落入Hbase,Kafka,Mysql等。

关于Redis Sink

Flink提供了封装好的写入Redis的包给我们用,首先我们要新增一个依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-redis_2.10</artifactId>
    <version>1.1.5</version>
</dependency>

然后我们实现一个自己的RedisSinkExample:

//指定Redis set
public static final class RedisSinkExample implements RedisMapper<Tuple2<String,Integer>> {
public RedisCommandDescription getCommandDescription() {
    return new RedisCommandDescription(RedisCommand.SET, null);
}

public String getKeyFromData(Tuple2<String, Integer> data) {
    return data.f0;
}

public String getValueFromData(Tuple2<String, Integer> data) {
    return data.f1.toString();
}
}

我们用最简单的单机Redis的SET命令进行演示。

完整的代码如下,实现一个读取Kafka的消息,然后进行WordCount,并把结果更新到redis中:

public class RedisSinkTest {

public static void main(String[] args) throws Exception{

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(2000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

//连接kafka
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
consumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(consumer);
DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1);

//实例化FlinkJedisPoolConfig 配置redis
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setHost("6379").build();
//实例化RedisSink,并通过flink的addSink的方式将flink计算的结果插入到redis

counts.addSink(new RedisSink<>(conf,new RedisSinkExample()));
env.execute("WordCount From Kafka To Redis");

}//
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    String[] tokens = value.toLowerCase().split("\\W+");
    for (String token : tokens) {
        if (token.length() > 0) {
            out.collect(new Tuple2<String, Integer>(token, 1));
        }
    }
}
}
//指定Redis set
public static final class RedisSinkExample implements RedisMapper<Tuple2<String,Integer>> {
public RedisCommandDescription getCommandDescription() {
    return new RedisCommandDescription(RedisCommand.SET, null);
}

public String getKeyFromData(Tuple2<String, Integer> data) {
    return data.f0;
}

public String getValueFromData(Tuple2<String, Integer> data) {
    return data.f1.toString();
}
}

}//

所有代码,我放在了我的公众号,回复Flink可以下载

  • 海量【java和大数据的面试题+视频资料】整理在公众号,关注后可以下载~
  • 更多大数据技术欢迎和作者一起探讨~

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • C# Memory Cache 踩坑记录

    前些天公司服务器数据库访问量偏高,运维人员收到告警推送,安排我团队小伙伴排查原因.

    码农阿宇
  • 2018“金三”之一线互联网公司Java高级面试题总结

    3、知道字节码吗?字节码都有哪些?Integer x =5,int y =5,比较x =y 都经过哪些步骤?

    技术zhai
  • 【独家】终生受用的Redis高可用技术解决方案大全

    Redis 单副本,采用单个Redis节点部署架构,没有备用节点实时同步数据,不提供数据持久化和备份策略,适用于数据可靠性要求不高的纯缓存业务场景。

    技术zhai
  • 消息中间件系列第2讲:如何进行消息队列选型?

    要做技术选型,那么必须对现今的各个消息中间件有个深入的理解才能做技术选型。否则别人问你,你为什么要用这个消息中间件,你说不出个所以然来,怎么做架构师呢?

    陈树义
  • redis之django-redis

    使用中间件,经过一系列的认证等操作,如果内容在缓存中存在,则使用FetchFromCacheMiddleware获取内容并返回给用户, 当返回给用户之前,判断...

    人生不如戏
  • redis的安装与使用

     redis是当前比较热门的NOSQL系统之一,它是一个key-value存储系统。和Memcached类似,但很大程度补偿了memcached的不足,它支持存...

    人生不如戏
  • 每周分享第4期

    我们都知道在JDK1.8的HotSpot虚拟机中已经没有永久代这个内存区域了,但许多人其实并不知道在JDK1.7的时候这项工作就在悄悄进行了。

    陈树义
  • windows 下对redis安装和部署以及连接客户端与操作

    Redis官方并没有提供Redis的windows安装包,但在github上, 有相关的下载地址,如下: https://github.com/Servic...

    学到老
  • R语言自带的数据文件

    R语言有大量的样本数据可以直接用来作为数据分析和挖掘案例,可以收藏着以后用! R:datasets >install.packages("datasets")...

    学到老
  • 解析分布式锁之redis实现

    摘要:分布式架构设计如今在企业中被大量的应用,而在不同的分布式节点进行协同工作的时候,节点服务的时序、结果的正确性以及执行成本也成为了必须考虑的重要因素。其中竞...

    技术zhai

扫码关注云+社区

领取腾讯云代金券