前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Storm与Redis集成

Storm与Redis集成

作者头像
smartsi
发布2019-08-07 10:09:21
5930
发布2019-08-07 10:09:21
举报
文章被收录于专栏:SmartSiSmartSi

Storm-redis 使用 Jedis 作为 Redis 客户端。

1. 如何使用

添加Maven依赖:

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-redis</artifactId>
    <version>${storm.version}</version>
    <type>jar</type>
</dependency>

2. 常用Bolt

Storm-redis 提供了基本的 Bolt 实现:RedisLookupBolt,RedisStoreBolt 以及 RedisFilterBolt。

根据名字我们就可以知道其功能,RedisLookupBolt 从 Redis 中检索指定键的值,RedisStoreBolt 将键/值存储到 Redis 上。RedisFilterBolt 过滤键或字段不在 Redis 上的元组。

一个元组匹配一个键/值对,你可以在 TupleMapper 中定义匹配模式。你还可以从 RedisDataTypeDescription 中选择你需要的数据类型。通过 RedisDataTypeDescription.RedisDataType 来查看支持哪些数据类型。一些数据类型,例如散列,有序集,还需要指定额外的键来将元组转换为元素:

代码语言:javascript
复制
public RedisDataTypeDescription(RedisDataType dataType, String additionalKey) {
    this.dataType = dataType;
    this.additionalKey = additionalKey;
    if (dataType == RedisDataType.HASH ||
            dataType == RedisDataType.SORTED_SET || dataType == RedisDataType.GEO) {
        if (additionalKey == null) {
            throw new IllegalArgumentException("Hash, Sorted Set and GEO should have additional key");
        }
    }
}

这些接口与 RedisLookupMapper,RedisStoreMapper 以及 RedisFilterMapper 组合使用,分别适用于 RedisLookupBolt,RedisStoreBolt 以及 RedisFilterBolt。当你实现 RedisFilterMapper 时,请确保在 declareOutputFields() 中声明与输入流相同的字段,因为 FilterBolt 只是转发存在 Redis 上输入元组。

2.1 RedisLookupBolt

实现RedisLookupMapper:

代码语言:javascript
复制
class WordCountRedisLookupMapper implements RedisLookupMapper {
    private RedisDataTypeDescription description;
    private final String hashKey = "wordCount";

    public WordCountRedisLookupMapper() {
        description = new RedisDataTypeDescription(
                RedisDataTypeDescription.RedisDataType.HASH, hashKey);
    }

    @Override
    public List<Values> toTuple(ITuple input, Object value) {
        String member = getKeyFromTuple(input);
        List<Values> values = Lists.newArrayList();
        values.add(new Values(member, value));
        return values;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("wordName", "count"));
    }

    @Override
    public RedisDataTypeDescription getDataTypeDescription() {
        return description;
    }

    @Override
    public String getKeyFromTuple(ITuple tuple) {
        return tuple.getStringByField("word");
    }

    @Override
    public String getValueFromTuple(ITuple tuple) {
        return null;
    }
}

根据如下方式使用:

代码语言:javascript
复制
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
        .setHost(host).setPort(port).build();
RedisLookupMapper lookupMapper = new WordCountRedisLookupMapper();
RedisLookupBolt lookupBolt = new RedisLookupBolt(poolConfig, lookupMapper);
2.2 RedisStoreBolt

实现RedisStoreMapper:

代码语言:javascript
复制
class WordCountStoreMapper implements RedisStoreMapper {
    private RedisDataTypeDescription description;
    private final String hashKey = "wordCount";

    public WordCountStoreMapper() {
        description = new RedisDataTypeDescription(
            RedisDataTypeDescription.RedisDataType.HASH, hashKey);
    }

    @Override
    public RedisDataTypeDescription getDataTypeDescription() {
        return description;
    }

    @Override
    public String getKeyFromTuple(ITuple tuple) {
        return tuple.getStringByField("word");
    }

    @Override
    public String getValueFromTuple(ITuple tuple) {
        return tuple.getStringByField("count");
    }
}

根据如下方式使用:

代码语言:javascript
复制
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                .setHost(host).setPort(port).build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
2.3 RedisFilterBolt

实现RedisFilterMapper:

代码语言:javascript
复制
class BlacklistWordFilterMapper implements RedisFilterMapper {
    private RedisDataTypeDescription description;
    private final String setKey = "blacklist";

    public BlacklistWordFilterMapper() {
        description = new RedisDataTypeDescription(
                RedisDataTypeDescription.RedisDataType.SET, setKey);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }

    @Override
    public RedisDataTypeDescription getDataTypeDescription() {
        return description;
    }

    @Override
    public String getKeyFromTuple(ITuple tuple) {
        return tuple.getStringByField("word");
    }

    @Override
    public String getValueFromTuple(ITuple tuple) {
        return null;
    }
}

根据如下方式使用:

代码语言:javascript
复制
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
        .setHost(host).setPort(port).build();
RedisFilterMapper filterMapper = new BlacklistWordFilterMapper();
RedisFilterBolt filterBolt = new RedisFilterBolt(poolConfig, filterMapper);

3. 自定义Bolt

如果你的场景不适合 RedisStoreBolt,RedisLookupBolt 以及 RedisFilterBolt,那么 storm-redis 还提供了 AbstractRedisBolt,你可以自定义自己的业务逻辑。

代码语言:javascript
复制
public static class LookupWordTotalCountBolt extends AbstractRedisBolt {
    private static final Logger LOG = LoggerFactory.getLogger(LookupWordTotalCountBolt.class);
    private static final Random RANDOM = new Random();

    public LookupWordTotalCountBolt(JedisPoolConfig config) {
        super(config);
    }

    public LookupWordTotalCountBolt(JedisClusterConfig config) {
        super(config);
    }

    @Override
    public void execute(Tuple input) {
        JedisCommands jedisCommands = null;
        try {
            jedisCommands = getInstance();
            String wordName = input.getStringByField("word");
            String countStr = jedisCommands.get(wordName);
            if (countStr != null) {
                int count = Integer.parseInt(countStr);
                this.collector.emit(new Values(wordName, count));

                // print lookup result with low probability
                if(RANDOM.nextInt(1000) > 995) {
                    LOG.info("Lookup result - word : " + wordName + " / count : " + count);
                }
            } else {
                // skip
                LOG.warn("Word not found in Redis - word : " + wordName);
            }
        } finally {
            if (jedisCommands != null) {
                returnInstance(jedisCommands);
            }
            this.collector.ack(input);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // wordName, count
        declarer.declare(new Fields("wordName", "count"));
    }
}

4. Trident State 用法

  • RedisState 和 RedisMapState,为单机 Redis 模式提供了 Jedis 接口。
  • RedisClusterState 和 RedisClusterMapState,为 Redis 集群模式提供了 JedisCluster 接口。

RedisState:

代码语言:javascript
复制
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
    .setHost(redisHost).setPort(redisPort)
    .build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisLookupMapper lookupMapper = new WordCountLookupMapper();
RedisState.Factory factory = new RedisState.Factory(poolConfig);

TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);

stream.partitionPersist(factory,
    fields,
    new RedisStateUpdater(storeMapper).withExpire(86400000),
    new Fields()
);

TridentState state = topology.newStaticState(factory);
stream = stream.stateQuery(state, new Fields("word"),
    new RedisStateQuerier(lookupMapper),
    new Fields("columnName","columnValue")
);

RedisClusterState:

代码语言:javascript
复制
Set<InetSocketAddress> nodes = new HashSet<InetSocketAddress>();
for (String hostPort : redisHostPort.split(",")) {
    String[] host_port = hostPort.split(":");
    nodes.add(new InetSocketAddress(host_port[0], Integer.valueOf(host_port[1])));
}
JedisClusterConfig clusterConfig = new JedisClusterConfig.Builder().setNodes(nodes).build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisLookupMapper lookupMapper = new WordCountLookupMapper();
RedisClusterState.Factory factory = new RedisClusterState.Factory(clusterConfig);

TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);

stream.partitionPersist(factory, fields,
    new RedisClusterStateUpdater(storeMapper).withExpire(86400000,
    new Fields()
);

TridentState state = topology.newStaticState(factory);
stream = stream.stateQuery(state, new Fields("word"),
    new RedisClusterStateQuerier(lookupMapper),
    new Fields("columnName","columnValue")
);

storm版本:2.0.0-SNAPSHOT

原文:Storm Redis Integration

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-05-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 如何使用
  • 2. 常用Bolt
    • 2.1 RedisLookupBolt
      • 2.2 RedisStoreBolt
        • 2.3 RedisFilterBolt
        • 3. 自定义Bolt
        • 4. Trident State 用法
        相关产品与服务
        云数据库 Redis
        腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档