前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊RespServer

聊聊RespServer

作者头像
code4it
发布2020-08-21 17:01:04
2900
发布2020-08-21 17:01:04
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下RespServer

Resp

resp-server-0.16.0/src/main/java/com/github/tonivade/resp/Resp.java

代码语言:javascript
复制
interface Resp {
  void channel(SocketChannel channel);
  void connected(ChannelHandlerContext ctx);
  void disconnected(ChannelHandlerContext ctx);
  void receive(ChannelHandlerContext ctx, RedisToken message);
}
  • Resp接口定义了channel、connected、disconnected、receive方法

RespServer

resp-server-0.16.0/src/main/java/com/github/tonivade/resp/RespServer.java

代码语言:javascript
复制
public class RespServer implements Resp {

  private static final Logger LOGGER = LoggerFactory.getLogger(RespServer.class);

  private static final int BUFFER_SIZE = 1024 * 1024;
  private static final int MAX_FRAME_SIZE = BUFFER_SIZE * 100;

  private static final String DEFAULT_HOST = "localhost";
  private static final int DEFAULT_PORT = 12345;

  private EventLoopGroup bossGroup;
  private EventLoopGroup workerGroup;
  private ChannelFuture future;

  private final RespServerContext serverContext;

  public RespServer(RespServerContext serverContext) {
    this.serverContext = requireNonNull(serverContext);
  }

  public static Builder builder() {
    return new Builder();
  }

  public void start() {
    bossGroup = new NioEventLoopGroup();
    workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2);

    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new RespInitializerHandler(this))
        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .option(ChannelOption.SO_RCVBUF, BUFFER_SIZE)
        .option(ChannelOption.SO_SNDBUF, BUFFER_SIZE)
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

    future = bootstrap.bind(serverContext.getHost(), serverContext.getPort());
    // Bind and start to accept incoming connections.
    future.syncUninterruptibly();

    serverContext.start();

    LOGGER.info("server started: {}:{}", serverContext.getHost(), serverContext.getPort());
  }

  public void stop() {
    try {
      if (future != null) {
        closeFuture(future.channel().close());
      }
      future = null;
    } finally {
      workerGroup = closeWorker(workerGroup);
      bossGroup = closeWorker(bossGroup);
    }

    serverContext.stop();

    LOGGER.info("server stopped");
  }

  @Override
  public void channel(SocketChannel channel) {
    LOGGER.debug("new channel: {}", sourceKey(channel));

    channel.pipeline().addLast("redisEncoder", new RedisEncoder());
    channel.pipeline().addLast("linDelimiter", new RedisDecoder(MAX_FRAME_SIZE));
    channel.pipeline().addLast(new RespConnectionHandler(this));
  }

  @Override
  public void connected(ChannelHandlerContext ctx) {
    String sourceKey = sourceKey(ctx.channel());
    LOGGER.debug("client connected: {}", sourceKey);
    getSession(ctx, sourceKey);
  }

  @Override
  public void disconnected(ChannelHandlerContext ctx) {
    String sourceKey = sourceKey(ctx.channel());

    LOGGER.debug("client disconnected: {}", sourceKey);

    serverContext.removeSession(sourceKey);
  }

  @Override
  public void receive(ChannelHandlerContext ctx, RedisToken message) {
    String sourceKey = sourceKey(ctx.channel());

    LOGGER.debug("message received: {}", sourceKey);

    parseMessage(message, getSession(ctx, sourceKey))
      .ifPresent(serverContext::processCommand);
  }

  //......

}
  • RespServer实现了Resp接口,其start方法创建bossGroup、workerGroup,设置RespInitializerHandler为childHandler,然后执行bootstrap.bind(serverContext.getHost(), serverContext.getPort())及serverContext.start();channel设置了redisEncoder、linDelimiter、RespConnectionHandler;receive方法执行parseMessage(message, getSession(ctx, sourceKey)).ifPresent(serverContext::processCommand)

RespInitializerHandler

resp-server-0.16.0/src/main/java/com/github/tonivade/resp/RespInitializerHandler.java

代码语言:javascript
复制
class RespInitializerHandler extends ChannelInitializer<SocketChannel> {

  private final Resp impl;

  RespInitializerHandler(Resp impl) {
    this.impl = impl;
  }

  @Override
  protected void initChannel(SocketChannel channel) throws Exception {
    impl.channel(channel);
  }

  @Override
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    impl.disconnected(ctx);
  }
}
  • RespInitializerHandler继承ChannelInitializer,其initChannel、channelInactive方法均委托给Resp的实现

RedisEncoder

resp-server-0.16.0/src/main/java/com/github/tonivade/resp/protocol/RedisEncoder.java

代码语言:javascript
复制
public class RedisEncoder extends MessageToByteEncoder<RedisToken> {
  @Override
  protected void encode(ChannelHandlerContext ctx, RedisToken msg, ByteBuf out) throws Exception {
    out.writeBytes(new RedisSerializer().encodeToken(msg));
  }
}
  • RedisEncoder继承了MessageToByteEncoder,其encode通过RedisSerializer的encodeToken来编码RedisToken

RedisDecoder

resp-server-0.16.0/src/main/java/com/github/tonivade/resp/protocol/RedisDecoder.java

代码语言:javascript
复制
public class RedisDecoder extends ReplayingDecoder<Void> {

  private final int maxLength;

  public RedisDecoder(int maxLength) {
    this.maxLength = maxLength;
  }

  @Override
  protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
    out.add(parseResponse(buffer));
  }

  private RedisToken parseResponse(ByteBuf buffer) {
    RedisToken token = createParser(buffer).next();
    checkpoint();
    return token;
  }

  private RedisParser createParser(ByteBuf buffer) {
    return new RedisParser(maxLength, new NettyRedisSource(this, buffer));
  }

  //......

}
  • RedisDecoder继承了ReplayingDecoder,其decode通过RedisParser来解析

RespServerContext

resp-server-0.16.0/src/main/java/com/github/tonivade/resp/RespServerContext.java

代码语言:javascript
复制
public class RespServerContext implements ServerContext {
  private static final Logger LOGGER = LoggerFactory.getLogger(RespServerContext.class);

  private final StateHolder state = new StateHolder();
  private final ConcurrentHashMap<String, Session> clients = new ConcurrentHashMap<>();
  private final Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());

  private final String host;
  private final int port;
  private final CommandSuite commands;
  private SessionListener sessionListener;

  public RespServerContext(String host, int port, CommandSuite commands) {
    this(host, port, commands, nullListener());
  }

  public RespServerContext(String host, int port, CommandSuite commands,
                           SessionListener sessionListener) {
    this.host = requireNonNull(host);
    this.port = requireRange(port, 1024, 65535);
    this.commands = requireNonNull(commands);
    this.sessionListener = sessionListener;
  }

  public void start() {

  }

  public void stop() {
    clear();
    scheduler.shutdown();
  }

  @Override
  public int getClients() {
    return clients.size();
  }

  @Override
  public RespCommand getCommand(String name) {
    return commands.getCommand(name);
  }

  @Override
  public <T> Option<T> getValue(String key) {
    return state.getValue(key);
  }

  @Override
  public <T> Option<T> removeValue(String key) {
    return state.removeValue(key);
  }

  @Override
  public void putValue(String key, Object value) {
    state.putValue(key, value);
  }

  @Override
  public String getHost() {
    return host;
  }

  @Override
  public int getPort() {
    return port;
  }

  Session getSession(String sourceKey, Function<String, Session> factory) {
    return clients.computeIfAbsent(sourceKey, key -> {
      Session session = factory.apply(key);
      sessionListener.sessionCreated(session);
      return session;
    });
  }

  void processCommand(Request request) {
    LOGGER.debug("received command: {}", request);

    RespCommand command = getCommand(request.getCommand());
    try {
      executeOn(execute(command, request))
        .subscribe(response -> processResponse(request, response),
                   ex -> LOGGER.error("error executing command: " + request, ex));
    } catch (RuntimeException ex) {
      LOGGER.error("error executing command: " + request, ex);
    }
  }

  protected CommandSuite getCommands() {
    return commands;
  }

  protected void removeSession(String sourceKey) {
    Session session = clients.remove(sourceKey);
    if (session != null) {
      sessionListener.sessionDeleted(session);
    }
  }

  protected Session getSession(String key) {
    return clients.get(key);
  }

  protected RedisToken executeCommand(RespCommand command, Request request) {
    return command.execute(request);
  }

  protected <T> Observable<T> executeOn(Observable<T> observable) {
    return observable.observeOn(scheduler);
  }

  private void processResponse(Request request, RedisToken token) {
    request.getSession().publish(token);
    if (request.isExit()) {
      request.getSession().close();
    }
  }

  private Observable<RedisToken> execute(RespCommand command, Request request) {
    return Observable.create(observer -> {
      observer.onNext(executeCommand(command, request));
      observer.onComplete();
    });
  }

  private int requireRange(int value, int min, int max) {
    if (value <= min || value > max) {
      throw new IllegalArgumentException(min + " <= " + value + " < " + max);
    }
    return value;
  }

  private void clear() {
    clients.clear();
    state.clear();
  }
}
  • RespServerContext实现了ServerContext接口,其构造器要求设置commands参数;其processCommand方法先通过getCommand(request.getCommand())获取RespCommand,之后通过executeCommand来执行,最后返回RedisToken

RedisToken

resp-server-0.16.0/src/main/java/com/github/tonivade/resp/protocol/RedisToken.java

代码语言:javascript
复制
public interface RedisToken {
  RedisToken NULL_STRING = string((SafeString) null);
  RedisToken RESPONSE_OK = status("OK");

  RedisTokenType getType();

  <T> T accept(RedisTokenVisitor<T> visitor);

  static RedisToken nullString() {
    return NULL_STRING;
  }

  static RedisToken responseOk() {
    return RESPONSE_OK;
  }

  static RedisToken string(SafeString str) {
    return new StringRedisToken(str);
  }

  static RedisToken string(String str) {
    return new StringRedisToken(safeString(str));
  }

  static RedisToken status(String str) {
    return new StatusRedisToken(str);
  }

  static RedisToken integer(boolean b) {
    return new IntegerRedisToken(b ? 1 : 0);
  }

  static RedisToken integer(int i) {
    return new IntegerRedisToken(i);
  }

  static RedisToken error(String str) {
    return new ErrorRedisToken(str);
  }

  static RedisToken array(RedisToken... redisTokens) {
    return new ArrayRedisToken(ImmutableList.of(redisTokens));
  }

  static RedisToken array(Collection<RedisToken> redisTokens) {
    return new ArrayRedisToken(ImmutableList.from(redisTokens));
  }

  static RedisToken array(Sequence<RedisToken> redisTokens) {
    return new ArrayRedisToken(redisTokens.asArray());
  }

  static <T> Stream<T> visit(Stream<RedisToken> tokens, RedisTokenVisitor<T> visitor) {
    return tokens.map(token -> token.accept(visitor));
  }
}
  • RedisToken接口定义了getType、accept方法

小结

RespServer实现了Resp接口,其start方法创建bossGroup、workerGroup,设置RespInitializerHandler为childHandler,然后执行bootstrap.bind(serverContext.getHost(), serverContext.getPort())及serverContext.start();channel设置了redisEncoder、linDelimiter、RespConnectionHandler;receive方法执行parseMessage(message, getSession(ctx, sourceKey)).ifPresent(serverContext::processCommand)

doc

  • Resp
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-08-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Resp
  • RespServer
  • RespInitializerHandler
  • RedisEncoder
  • RedisDecoder
  • RespServerContext
  • RedisToken
  • 小结
  • doc
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档