首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊claudb的pubsub command

聊聊claudb的pubsub command

原创
作者头像
code4it
修改2020-08-31 10:02:43
4090
修改2020-08-31 10:02:43
举报

本文主要研究一下claudb的pubsub command

PublishCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/PublishCommand.java

@Command("publish")
@ParamLength(2)
public class PublishCommand implements DBCommand, SubscriptionSupport, PatternSubscriptionSupport {
​
  @Override
  public RedisToken execute(Database db, Request request) {
    String channel = request.getParam(0).toString();
    SafeString message = request.getParam(1);
    return integer(publishAll(getClauDB(request.getServerContext()), channel, message));
  }
​
  private int publishAll(DBServerContext server, String channel, SafeString message) {
    int count = publish(server, channel, message);
    int pcount = patternPublish(server, channel, message);
    return count + pcount;
  }
}
  • PublishCommand实现了DBCommand, SubscriptionSupport, PatternSubscriptionSupport接口,其execute方法执行publishAll(getClauDB(request.getServerContext()), channel, message)

SubscribeCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/SubscribeCommand.java

@ReadOnly
@Command("subscribe")
@ParamLength(1)
@PubSubAllowed
public class SubscribeCommand implements DBCommand, SubscriptionSupport {
​
  private static final String SUBSCRIBE = "subscribe";
​
  @Override
  public RedisToken execute(Database db, Request request) {
    Database admin = getAdminDatabase(request.getServerContext());
    String sessionId = getSessionId(request);
    Sequence<SafeString> channels = getChannels(request);
    int i = channels.size();
    List<Object> result = new LinkedList<>();
    for (SafeString channel : request.getParams()) {
      addSubscription(admin, sessionId, channel);
      getSessionState(request.getSession()).addSubscription(channel);
      result.addAll(asList(SUBSCRIBE, channel, ++i));
    }
    return convert(result);
  }
​
  private String getSessionId(Request request) {
    return request.getSession().getId();
  }
​
  private Sequence<SafeString> getChannels(Request request) {
    return getSessionState(request.getSession()).getSubscriptions();
  }
}
  • SubscribeCommand实现了DBCommand, SubscriptionSupport接口,其execute方法遍历channel挨个执行addSubscription及getSessionState(request.getSession()).addSubscription(channel)

UnsubscribeCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/UnsubscribeCommand.java

@ReadOnly
@Command("unsubscribe")
@ParamLength(1)
@PubSubAllowed
public class UnsubscribeCommand implements DBCommand, SubscriptionSupport {
​
  private static final String UNSUBSCRIBE = "unsubscribe";
​
  @Override
  public RedisToken execute(Database db, Request request) {
    Database admin = getAdminDatabase(request.getServerContext());
    String sessionId = getSessionId(request);
    Sequence<SafeString> channels = getChannels(request);
    int i = channels.size();
    List<Object> result = new LinkedList<>();
    for (SafeString channel : request.getParams()) {
      removeSubscription(admin, sessionId, channel);
      getSessionState(request.getSession()).removeSubscription(channel);
      result.addAll(asList(UNSUBSCRIBE, channel, --i));
    }
    return convert(result);
  }
​
  private String getSessionId(Request request) {
    return request.getSession().getId();
  }
​
  private Sequence<SafeString> getChannels(Request request) {
    return getSessionState(request.getSession()).getSubscriptions();
  }
}
  • UnsubscribeCommand实现了DBCommand, SubscriptionSupport接口,其execute方法遍历channel挨个执行removeSubscription(admin, sessionId, channel)及getSessionState(request.getSession()).removeSubscription(channel)

PatternSubscribeCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/PatternSubscribeCommand.java

@ReadOnly
@Command("psubscribe")
@ParamLength(1)
@PubSubAllowed
public class PatternSubscribeCommand implements DBCommand, PatternSubscriptionSupport {
​
  private static final String PSUBSCRIBE = "psubscribe";
​
  @Override
  public RedisToken execute(Database db, Request request) {
    Database admin = getAdminDatabase(request.getServerContext());
    String sessionId = getSessionId(request);
    Sequence<SafeString> channels = getChannels(request);
    int i = channels.size();
    List<Object> result = new LinkedList<>();
    for (SafeString pattern : request.getParams()) {
      addPatternSubscription(admin, sessionId, pattern);
      getSessionState(request.getSession()).addSubscription(pattern);
      result.addAll(asList(PSUBSCRIBE, pattern, ++i));
    }
    return convert(result);
  }
​
  private String getSessionId(Request request) {
    return request.getSession().getId();
  }
​
  private Sequence<SafeString> getChannels(Request request) {
    return getSessionState(request.getSession()).getSubscriptions();
  }
}
  • PatternSubscribeCommand实现了DBCommand, PatternSubscriptionSupport接口,其execute方法遍历pattern挨个执行addPatternSubscription(admin, sessionId, pattern)及getSessionState(request.getSession()).addSubscription(pattern)

PatternUnsubscribeCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/command/pubsub/PatternUnsubscribeCommand.java

@ReadOnly
@Command("punsubscribe")
@ParamLength(1)
@PubSubAllowed
public class PatternUnsubscribeCommand implements DBCommand, PatternSubscriptionSupport {
​
  private static final String PUNSUBSCRIBE = "punsubscribe";
​
  @Override
  public RedisToken execute(Database db, Request request) {
    Database admin = getAdminDatabase(request.getServerContext());
    String sessionId = getSessionId(request);
    Sequence<SafeString> channels = getChannels(request);
    int i = channels.size();
    List<Object> result = new LinkedList<>();
    for (SafeString channel : request.getParams()) {
      removePatternSubscription(admin, sessionId, channel);
      getSessionState(request.getSession()).removeSubscription(channel);
      result.addAll(asList(PUNSUBSCRIBE, channel, --i));
    }
    return convert(result);
  }
​
  private String getSessionId(Request request) {
    return request.getSession().getId();
  }
​
  private Sequence<SafeString> getChannels(Request request) {
    return getSessionState(request.getSession()).getSubscriptions();
  }
}
  • PatternUnsubscribeCommand实现了DBCommand, PatternSubscriptionSupport接口,其execute遍历channel挨个执行removePatternSubscription(admin, sessionId, channel)及getSessionState(request.getSession()).removeSubscription(channel)

小结

claudb pubsub相关的command有PublishCommand、SubscribeCommand、UnsubscribeCommand、PatternSubscribeCommand、PatternUnsubscribeCommand

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SubscribeCommand
  • UnsubscribeCommand
  • PatternSubscribeCommand
  • PatternUnsubscribeCommand
  • 小结
  • doc
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档