前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >tron-网络模型-AdvService广播服务

tron-网络模型-AdvService广播服务

作者头像
潇洒
发布2023-10-23 14:51:08
1810
发布2023-10-23 14:51:08
举报
文章被收录于专栏:石头岛石头岛

AdvService 作用

AdvService 负责将数据广播到tron网络当中。 基础框架是netty,在此之上开发AdvService对业务进行了封装。

数据包括:

  1. 交易
  2. 区块

需要注意的是,tronAdvService的这套广播逻辑,不是单向广播,而是双向互动。 啥意思,就是说,一般理解,一条数据广播出去后,就广播到对方节上了。 但是tron的广播不是这样,而是先广播一个交易ID到目录节点上,目标节点收到ID后,再发一条请求接取的网络请求,把数据接回去!!!! 是不是有点反直觉!!!

AdvService 主要成员

invToFetch: invToSpread: 待广播的数据:交易、区块 invToFetchCache:

主要方法

consumerInvToSpread: 处理发送队列 consumerInvToFetch: 处理拉取队列 broadcast: 广播

处理流程

  1. broadcast: 构建广播消息体:只包含ID
    1. 将数据添加入trxCache/blockCache
    2. 封装item
    3. 保存待发送消息: invToSpread.put(item)
  2. consumerInvToSpread: 处理 invToSpread

广播 id

需要发送的数据如:交易区块,通过调用AdvService.broadcastid广播。 但是广播并不是一调用broadcast就发送出去的,还需要在各个队列中导来导去好几次。 调用栈

代码语言:javascript
复制
broadcast()
\--consumerInvToSpread()
   \--invSender.sendInv();
       \--peer.sendMessage(new InventoryMessage(value, key));
          \--msgQueue.sendMessage(message);
             \--requestQueue.add(new MessageRoundTrip(msg));

步骤:

  1. 判断block/trx blockCache //接收到的数据进缓存 trxCache //接收到的数据进缓存
  2. 将数据装成 item
  3. invToSpread.put(item)
  4. consumerInvToSpread //处理 invToSpread 的中数据

具体实现:

代码语言:javascript
复制
public void broadcast(Message msg) {
  // 如果是 fastForward 节点不广播
  if (fastForward) {
    return;
  }

  // MAX_SPREAD_SIZE = 1_000
  if (invToSpread.size() > MAX_SPREAD_SIZE) {
    logger.warn("Drop message, type: {}, ID: {}", msg.getType(), msg.getMessageId());
    return;
  }

  Item item;
  if (msg instanceof BlockMessage) {
    BlockMessage blockMsg = (BlockMessage) msg;
    // 注意这里,是id,而不是区块本身
    item = new Item(blockMsg.getMessageId(), InventoryType.BLOCK);
    logger.info("Ready to broadcast block {}", blockMsg.getBlockId().getString());
    // 把Block中的交易放到trxCache
    blockMsg.getBlockCapsule().getTransactions().forEach(transactionCapsule -> {
      Sha256Hash tid = transactionCapsule.getTransactionId();
      invToSpread.remove(tid);
      trxCache.put(new Item(tid, InventoryType.TRX),
          new TransactionMessage(transactionCapsule.getInstance()));
    });
    blockCache.put(item, msg);
  } else if (msg instanceof TransactionMessage) {
    TransactionMessage trxMsg = (TransactionMessage) msg;
    // getMessageId 是交易id
    item = new Item(trxMsg.getMessageId(), InventoryType.TRX);
    trxCount.add();
    trxCache.put(item, new TransactionMessage(trxMsg.getTransactionCapsule().getInstance()));
  } else {
    logger.error("Adv item is neither block nor trx, type: {}", msg.getType());
    return;
  }

  invToSpread.put(item, System.currentTimeMillis());

  if (InventoryType.BLOCK.equals(item.getType())) {
    consumerInvToSpread();
  }
}

@Override
public Sha256Hash getMessageId() {
  return this.transactionCapsule.getTransactionId();
}

@Override
public Sha256Hash getMessageId() {
  return getBlockCapsule().getBlockId();
}

consumerInvToSpread 中处理

处理过程:

  1. 拿到所有活跃PeerConnection
  2. 创建InvSender
  3. 把数据copy到 peer 的AdvInvSpread
代码语言:javascript
复制
private synchronized void consumerInvToSpread() {
  // 判断 peer 状态,这段代码不能忽略
  // peer.isNeedSyncFromPeer() 表示这个peer的状态,正在同步,区块状态不完整,不完整的不要广播给它
  // peer.isNeedSyncFromUs() 表示peer,正在给别的节点同步数据:
  //   区块还不完整,那么链的状态也是不完整的,广播过去的交易,它们也处理不了
  List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
      .filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs())
      .collect(Collectors.toList());

  if (invToSpread.isEmpty() || peers.isEmpty()) {
    return;
  }

  InvSender invSender = new InvSender();

  invToSpread.forEach((item, time) -> peers.forEach(peer -> {
    if (peer.getAdvInvReceive().getIfPresent(item) == null
        && peer.getAdvInvSpread().getIfPresent(item) == null
        && !(item.getType().equals(InventoryType.BLOCK)       // 如果是Block 且 超过3秒就不广播了
        && System.currentTimeMillis() - time > BLOCK_PRODUCED_INTERVAL)) {
      // 把交易塞到 peer 的 AdvInvSpread 队列
      peer.getAdvInvSpread().put(item, Time.getCurrentMillis());
      invSender.add(item, peer);
    }
    // 移除提交易
    invToSpread.remove(item);
  }));

  invSender.sendInv();
}

//TODO

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-09-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • AdvService 作用
    • AdvService 主要成员
      • 主要方法
      • 处理流程
      • 广播 id
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档