前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >tron 接收交易和广播交易

tron 接收交易和广播交易

作者头像
潇洒
发布2023-10-20 11:09:58
3600
发布2023-10-20 11:09:58
举报
文章被收录于专栏:石头岛

前言

分析tron是如何接收到交易,并在接收到交易后,后续是如何处理的,交易处理细节可以看看:tron 交易处理--交易执行逻辑

接收交易

节点使用netty进行P2P连接,主要使用到的类:

  1. TransactionsMsgHandler: netty Handler处理器
  2. TronNetService: 消息分发
  3. AdvService: 消息广播
  4. FetchInvDataMsgHandler: 消息拉取

交易处理调用栈:

代码语言:javascript
复制
TronNetHandler.channelRead0 接收消息
\--TronNetService.onMessage 分发消息
   \--transactionsMsgHandler.processMessage; 具体业务处理

TronNetService.onMessage 分发消息

代码语言:javascript
复制
protected void onMessage(PeerConnection peer, TronMessage msg) {
  long startTime = System.currentTimeMillis();
  try {
    switch (msg.getType()) {
      case SYNC_BLOCK_CHAIN:
        syncBlockChainMsgHandler.processMessage(peer, msg);
        break;
      case BLOCK_CHAIN_INVENTORY:
        chainInventoryMsgHandler.processMessage(peer, msg);
        break;
      case INVENTORY:
        inventoryMsgHandler.processMessage(peer, msg);
        break;
      case FETCH_INV_DATA:
        fetchInvDataMsgHandler.processMessage(peer, msg);
        break;
      case BLOCK:
        blockMsgHandler.processMessage(peer, msg);
        break;
      case TRXS:
        // 交易处理入口
        transactionsMsgHandler.processMessage(peer, msg);
        break;
      case PBFT_COMMIT_MSG:
        pbftDataSyncHandler.processMessage(peer, msg);
        break;
      default:
        throw new P2pException(TypeEnum.NO_SUCH_MESSAGE, msg.getType().toString());
    }
  } catch (Exception e) {
    processException(peer, msg, e);
  } finally {
    long costs = System.currentTimeMillis() - startTime;
    if (costs > DURATION_STEP) {
      logger.info("Message processing costs {} ms, peer: {}, type: {}, time tag: {}",
          costs, peer.getInetAddress(), msg.getType(), getTimeTag(costs));
      Metrics.histogramObserve(MetricKeys.Histogram.MESSAGE_PROCESS_LATENCY,
          costs / Metrics.MILLISECONDS_PER_SECOND, msg.getType().name());
    }
  }
}

TransactionsMsgHandler

接收到的交易先放在线程池: trxHandlePool

再由trxHandlePool调用handleTransaction处理交易。

普通交易智能合约的交易,处理还不一样。

先看下交易缓冲池:

代码语言:javascript
复制
// 无界队列
private BlockingQueue<Runnable> queue = new LinkedBlockingQueue();
// 工作线程数
private int threadNum = Args.getInstance().getValidateSignThreadNum();
private ExecutorService trxHandlePool = new ThreadPoolExecutor(threadNum, threadNum, 0L,
    TimeUnit.MILLISECONDS, queue);

processMessage 消息处理入口

区分普通交易和合约交易,另外会统计队列大小

代码语言:javascript
复制
@Override
public void processMessage(PeerConnection peer, TronMessage msg) throws P2pException {
  TransactionsMessage transactionsMessage = (TransactionsMessage) msg;
  check(peer, transactionsMessage);
  int smartContractQueueSize = 0;
  int trxHandlePoolQueueSize = 0;
  int dropSmartContractCount = 0;

  // 遍历交易
  for (Transaction trx : transactionsMessage.getTransactions().getTransactionsList()) {
    int type = trx.getRawData().getContract(0).getType().getNumber();
    // 合约类型交易
    if (type == ContractType.TriggerSmartContract_VALUE
        || type == ContractType.CreateSmartContract_VALUE) {
      // 合约类型交易没有直接执行,而是添加到了 smartContractQueue 队列当中
      // 注意,这里用的是 !offer,也就是说插入失败了,超过限制
      // MAX_TRX_SIZE = 50_000
      if (!smartContractQueue.offer(new TrxEvent(peer, new TransactionMessage(trx)))) {
        smartContractQueueSize = smartContractQueue.size();
        // queue 是线程池的队列长度
        trxHandlePoolQueueSize = queue.size();
        dropSmartContractCount++;
      }
      // 没有 else 处理,那这笔交易就丢掉了!!!
    } else {
    // 普通交易
      trxHandlePool.submit(() -> handleTransaction(peer, new TransactionMessage(trx)));
    }
  }

  // 上面没有else处理,但是这里加了判断,会打印出队列长度
  if (dropSmartContractCount > 0) {
    logger.warn("Add smart contract failed, drop count: {}, queueSize {}:{}",
        dropSmartContractCount, smartContractQueueSize, trxHandlePoolQueueSize);
  }
}

智能合约处理 handleSmartContract

智能合约交易,会有单独的线程来处理:

代码语言:javascript
复制
private void handleSmartContract() {
  // 这是个单线程的延时处理线程池
  // 也就是智能合约的交易,20ms执行一次
  smartContractExecutor.scheduleWithFixedDelay(() -> {
    try {
      // 限制 MAX_SMART_CONTRACT_SUBMIT_SIZE = 100
      // 那 queue 里数据多了,还执行不了!!
      // 也就是 queue 一定要先消费到 < MAX_SMART_CONTRACT_SUBMIT_SIZE
      while (queue.size() < MAX_SMART_CONTRACT_SUBMIT_SIZE) {
        TrxEvent event = smartContractQueue.take();
        trxHandlePool.submit(() -> handleTransaction(event.getPeer(), event.getMsg()));
      }
    } catch (InterruptedException e) {
      logger.warn("Handle smart server interrupted");
      Thread.currentThread().interrupt();
    } catch (Exception e) {
      logger.error("Handle smart contract exception", e);
    }
  }, 1000, 20, TimeUnit.MILLISECONDS);
}

交易处理、广播 TransactionsMsgHandler.handleTransaction

调用栈

代码语言:javascript
复制
TransactionsMsgHandler.handleTransaction
\--AdvService.broadcast: 广播服务

在这里可以看到,每个tron节点在接到到交易到后:

  1. 先自己处理
  2. 再广播交易

广播也挺复杂,单独写个博客细扣。

代码语言:javascript
复制
private void handleTransaction(PeerConnection peer, TransactionMessage trx) {
  if (peer.isDisconnect()) {
    logger.warn("Drop trx {} from {}, peer is disconnect", trx.getMessageId(),
        peer.getInetAddress());
    return;
  }

  // 广播队列验重
  if (advService.getMessage(new Item(trx.getMessageId(), InventoryType.TRX)) != null) {
    return;
  }

  try {
    tronNetDelegate.pushTransaction(trx.getTransactionCapsule());
    // 广播交易
    advService.broadcast(trx);
  } catch (P2pException e) {
    logger.warn("Trx {} from peer {} process failed. type: {}, reason: {}",
        trx.getMessageId(), peer.getInetAddress(), e.getType(), e.getMessage());
    // 如果是 BAD_TRX 断开连接
    if (e.getType().equals(TypeEnum.BAD_TRX)) {
      peer.disconnect(ReasonCode.BAD_TX);
    }
  } catch (Exception e) {
    logger.error("Trx {} from peer {} process failed", trx.getMessageId(), peer.getInetAddress(),
        e);
  }
}

广播数据 AdvService.broadcast

首先要明确一个点:广播过去的,并示是交易,而是交易ID!!

广播的方式并不是把交易直接广播到其它节点,而是广播ID,然后其它节点到这个节点来拉取交易信息!!

广播缓存,使用guave cache,最老淘汰机制,如果超过MAX_TRX_CACHE_SIZE大小则老数据会丢弃,已经验证过这个场景,不过一般超达不到这个限制,只有在极端测试环境下能达到。

数据也就保留1H,也就是超时就丢弃。

重要成员变量

代码语言:javascript
复制
private final int MAX_TRX_CACHE_SIZE = 50_000;
// 广播缓存,MAX_TRX_CACHE_SIZE
// 提供缓存供外部获取、验重等作用
private Cache<Item, Message> trxCache = CacheBuilder.newBuilder()
    .maximumSize(MAX_TRX_CACHE_SIZE).expireAfterWrite(1, TimeUnit.HOURS)
    .recordStats().build();

// invToSpread 最大限制
private final int MAX_SPREAD_SIZE = 1_000
// 待发送队列
private ConcurrentHashMap<Item, Long> invToSpread = new ConcurrentHashMap<>();

广播逻辑

可以广播blocktransaction数据。

代码语言:javascript
复制
public void broadcast(Message msg) {

   if (fastForward) {
     return;
   }

   // 校验交易缓存大小,这里会限制,不过一般超不过这个限制,可以适当调大或调小
   if (invToSpread.size() > MAX_SPREAD_SIZE) {
     logger.warn("Drop message, type: {}, ID: {}", msg.getType(), msg.getMessageId());
     return;
   }

   Item item;
   if (msg instanceof BlockMessage) {
     BlockMessage blockMsg = (BlockMessage) msg;
     item = new Item(blockMsg.getMessageId(), InventoryType.BLOCK);
     logger.info("Ready to broadcast block {}", blockMsg.getBlockId().getString());
     blockMsg.getBlockCapsule().getTransactions().forEach(transactionCapsule -> {
       Sha256Hash tid = transactionCapsule.getTransactionId();
       invToSpread.remove(tid);
       trxCache.put(new Item(tid, InventoryType.TRX),
           new TransactionMessage(transactionCapsule.getInstance()));
     });
     blockCache.put(item, msg);
   } else if (msg instanceof TransactionMessage) {
     TransactionMessage trxMsg = (TransactionMessage) msg;
     // 注意,trxMsg.getMessageId() 是交易id: transactionCapsule.getTransactionId()
     // 也就是这里构建了一条广播消息的Item,包含了:交易ID、交易类型 TRX
     item = new Item(trxMsg.getMessageId(), InventoryType.TRX);
     trxCount.add();
     trxCache.put(item, new TransactionMessage(trxMsg.getTransactionCapsule().getInstance()));
   } else {
     logger.error("Adv item is neither block nor trx, type: {}", msg.getType());
     return;
   }

   invToSpread.put(item, System.currentTimeMillis());

   if (InventoryType.BLOCK.equals(item.getType())) {
     consumerInvToSpread();
   }
 }

拉取数据 FetchInvDataMsgHandler

假设上面的的交易通过节点A广播到了节点B,节点B收到消息后,就会来拉取直正的交易数据。

B 节点会发送 FETCH_INV_DATA 类型消息来A节点获数据。

核心方法在:FetchInvDataMsgHandler.processMessage

代码语言:javascript
复制
for (Sha256Hash hash : fetchInvDataMsg.getHashList()) {
  Item item = new Item(hash, type);
  // 遍历 advService 的缓存数据,getMessage 中包含之前已发送的数据
  Message message = advService.getMessage(item);
  if (message == null) {
    try {
      // type: block、trx
      message = tronNetDelegate.getData(hash, type);
    } catch (Exception e) {
      throw new P2pException(TypeEnum.DB_ITEM_NOT_FOUND,
              "Fetch item " + item + " failed. reason: " + e.getMessage());
    }
  }
  ···
}

FetchInvDataMsgHandler.processMessageAdvServicetrxCache中拉取之前缓存的数据,这样就完成了一个广播到获取数据的流程。

AdvService.getMessage

代码语言:javascript
复制
public Message getMessage(Item item) {
  if (item.getType() == InventoryType.TRX) {
    return trxCache.getIfPresent(item);
  } else {
    return blockCache.getIfPresent(item);
  }
}

发送 consumerInvToSpread

发送数据由: consumerInvToSpread 方法执行,通过:

  1. spreadExecutor 定时执行
  2. broadcast 中判断类型为InventoryType.BLOCK则立即发送
代码语言:javascript
复制
private synchronized void consumerInvToSpread() {

  List<PeerConnection> peers = tronNetDelegate.getActivePeer().stream()
      .filter(peer -> !peer.isNeedSyncFromPeer() && !peer.isNeedSyncFromUs())
      .collect(Collectors.toList());

  if (invToSpread.isEmpty() || peers.isEmpty()) {
    return;
  }

  InvSender invSender = new InvSender();

  invToSpread.forEach((item, time) -> peers.forEach(peer -> {
    if (peer.getAdvInvReceive().getIfPresent(item) == null
        && peer.getAdvInvSpread().getIfPresent(item) == null
        && !(item.getType().equals(InventoryType.BLOCK)
        && System.currentTimeMillis() - time > BLOCK_PRODUCED_INTERVAL)) {
      peer.getAdvInvSpread().put(item, Time.getCurrentMillis());
      invSender.add(item, peer);
    }
    // 移除本次发送的数据,这样才不会越来越大
    invToSpread.remove(item);
  }));
  // 发送
  invSender.sendInv();
}

Transaction 交易结构

tron链使用protobuf进行序列化和反序列化,观察一下Transaction的结构:

代码语言:javascript
复制
message Transaction {
  message Contract {
    enum ContractType {
      AccountCreateContract = 0;
      // 普通交易
      TransferContract = 1;
      // TRC10资产交易
      TransferAssetContract = 2;
      VoteAssetContract = 3;
      VoteWitnessContract = 4;
      WitnessCreateContract = 5;
      AssetIssueContract = 6;
      // 7 呢?
      WitnessUpdateContract = 8;
      ParticipateAssetIssueContract = 9;
      AccountUpdateContract = 10;
      // 冻结
      FreezeBalanceContract = 11;
      // 解冻
      UnfreezeBalanceContract = 12;
      // 提取奖励
      WithdrawBalanceContract = 13;
      UnfreezeAssetContract = 14;
      UpdateAssetContract = 15;
      ProposalCreateContract = 16;
      ProposalApproveContract = 17;
      ProposalDeleteContract = 18;
      SetAccountIdContract = 19;
      CustomContract = 20;
      CreateSmartContract = 30;
      TriggerSmartContract = 31;
      GetContract = 32;
      UpdateSettingContract = 33;
      ExchangeCreateContract = 41;
      ExchangeInjectContract = 42;
      ExchangeWithdrawContract = 43;
      ExchangeTransactionContract = 44;
      UpdateEnergyLimitContract = 45;
      AccountPermissionUpdateContract = 46;
      ClearABIContract = 48;
      UpdateBrokerageContract = 49;
      ShieldedTransferContract = 51;
      MarketSellAssetContract = 52;
      MarketCancelOrderContract = 53;
    }
    ContractType type = 1;
    google.protobuf.Any parameter = 2;
    bytes provider = 3;
    bytes ContractName = 4;
    int32 Permission_id = 5;
  }

  message Result {
    enum code {
      SUCESS = 0;
      FAILED = 1;
    }
    enum contractResult {
      DEFAULT = 0;
      SUCCESS = 1;
      REVERT = 2;
      BAD_JUMP_DESTINATION = 3;
      OUT_OF_MEMORY = 4;
      PRECOMPILED_CONTRACT = 5;
      STACK_TOO_SMALL = 6;
      STACK_TOO_LARGE = 7;
      ILLEGAL_OPERATION = 8;
      STACK_OVERFLOW = 9;
      OUT_OF_ENERGY = 10;
      OUT_OF_TIME = 11;
      JVM_STACK_OVER_FLOW = 12;
      UNKNOWN = 13;
      TRANSFER_FAILED = 14;
      INVALID_CODE = 15;
    }
    int64 fee = 1;
    code ret = 2;
    contractResult contractRet = 3;

    string assetIssueID = 14;
    int64 withdraw_amount = 15;
    int64 unfreeze_amount = 16;
    int64 exchange_received_amount = 18;
    int64 exchange_inject_another_amount = 19;
    int64 exchange_withdraw_another_amount = 20;
    int64 exchange_id = 21;
    int64 shielded_transaction_fee = 22;


    bytes orderId = 25;
    repeated MarketOrderDetail orderDetails = 26;
  }

  message raw {
    bytes ref_block_bytes = 1;
    int64 ref_block_num = 3;
    bytes ref_block_hash = 4;
    int64 expiration = 8;
    repeated authority auths = 9;
    // data not used
    bytes data = 10;
    //only support size = 1,  repeated list here for extension
    repeated Contract contract = 11;
    // scripts not used
    bytes scripts = 12;
    int64 timestamp = 14;
    int64 fee_limit = 18;
  }

  raw raw_data = 1;
  // only support size = 1,  repeated list here for muti-sig extension
  repeated bytes signature = 2;
  repeated Result ret = 5;
}

交易广播播代码:

TronNetService.java

AdvService.java

总结

了解这块代码的意义在于知道交易是怎么接收、处理、广播的,了解交易在所以节点之间的处理、流转。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-11-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 接收交易
  • TronNetService.onMessage 分发消息
  • TransactionsMsgHandler
    • processMessage 消息处理入口
      • 智能合约处理 handleSmartContract
        • 交易处理、广播 TransactionsMsgHandler.handleTransaction
          • 广播数据 AdvService.broadcast
            • 拉取数据 FetchInvDataMsgHandler
              • 发送 consumerInvToSpread
                • Transaction 交易结构
                • 总结
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档