前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >风骚的操作:区块链监控个人账户即时在线充值

风骚的操作:区块链监控个人账户即时在线充值

作者头像
肉眼品世界
发布2020-12-02 10:26:27
6340
发布2020-12-02 10:26:27
举报
文章被收录于专栏:肉眼品世界

在区块链的世界里,常常很多时候用户需要充值,要不拉起钱包,要不支付到某个特定账号,这个时候可以监控合约交易记录实现实时到账,有的时候上某些网站的时候,至于是哪些网站,小编就不太好说了,有见过直接备注信息充值扫码支付到个人二维码,然后立马就会充值成功,那么这个是怎么实现的呢

当然是后台一个守护进程,然后实时监控到账情况,通过MEMO进行订单信息识别入款,springboot里新开一个守护进程很多种方式,但是小编我比较懒,就给大家介绍一种一个地方改代码就能实现的方式:

通过实现ApplicationRunner的多线程方式:

RechargeMonitorRunne.java

代码语言:javascript
复制
package io.xxxschedule;

import io.xxx.component.RechargeMonitorServer;
import io.xxx.configuration.AsyncConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

@Component
public class RechargeMonitorRunner implements ApplicationRunner {
    private static final Logger logger = LoggerFactory.getLogger(RechargeMonitorRunner.class);
    @Resource
    private RechargeMonitorServer monitorServer;
    @Resource
    private AsyncConfiguration threadConfig;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        SimpleDateFormat df = new SimpleDateFormat("HH:mm");//设置日期格式
        logger.info("计时器开启。。。。。。");
        while (true){
            monitoServer.tradeMonitor();
        // this.tradeMonitor();
            String s=df.format(new Date());
            //System.out.println(s);
            if ("23:50".equals(s)){//当时间为23:50
                logger.info("当前时间为:"+df.format(new Date())+"开启数据入库--------");
                break;//退出这个while(true)循环
            }
        }
    }
    @Async
    public Future<String> tradeMonitor() {
        System.out.println("\n\n----------------------------------------------");
        System.out.println(Thread.currentThread().getName() + "正在处理请求 tradeMonitor");
        System.out.println("----------------------------------------------");
        String result = "请求失败";

        //....你的业务逻辑
        //  return CompletableFuture.completedFuture(result);
        return null;
    }


}

监控合约接口,当然也是一个内部api,异步执行,配置异步多线程池:

AsyncConfiguration.java

代码语言:javascript
复制
package io.xxx.configuration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
@EnableAsync  // 启用异步任务
public class AsyncConfiguration {

    // 声明一个线程池(并指定线程池的名字)
    @Bean("AsyncThread")
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程数5:线程池创建时候初始化的线程数
        executor.setCorePoolSize(5);
        //最大线程数5:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        executor.setMaxPoolSize(10);
        //缓冲队列500:用来缓冲执行任务的队列
        executor.setQueueCapacity(500);
        //允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
        executor.setKeepAliveSeconds(60);
        //线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
        executor.setThreadNamePrefix("AsyncThread-");
        executor.initialize();
        return executor;
    }
}

业务实现monitorServer.java,这个异步执行文件需要另外一个类分开才能执行:

代码语言:javascript
复制
package io.xxx.component;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import javax.servlet.http.HttpSession;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

@Service
public class RechargeMonitorServer {
    @Async("AsyncThread")
    public void tradeMonitor() {
        System.out.println("\n\n----------------------------------------------");
        System.out.println(Thread.currentThread().getName() + "正在处理请求 tradeMonitor");
        System.out.println("----------------------------------------------");
        String result = "请求失败";
        //....你的业务逻辑
      //  return CompletableFuture.completedFuture(result);
    }
}

一run,是可以跑起来,哐当,隔一会儿就会报错,报错信息如下:

代码语言:javascript
复制
2020-11-26 09:50:58.061  INFO 18692 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'AsyncThread'
RechargeMonitorRunner:

Caused by: org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@6c4ce583[Running, pool size = 10, active threads = 10, queued tasks = 466, completed tasks = 271803]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$$Lambda$654/1923626523@272778ae

原来while true里不停的加任务,任务满了就把这个守护线程挂了

怎么办,那么换一种写法捕捉队列满异常:

代码语言:javascript
复制
package io.xxx.schedule;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import io.xxx.Utils.ErrorAlarm;
import io.xxx.Utils.OkHttpUtils;
import io.xxx.component.RechargeMonitorServer;
import io.xxx.domain.Order;
import io.xxx.service.OrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;

@Component
public class RechargeMonitorRunner implements ApplicationRunner {
    private static final Logger logger = LoggerFactory.getLogger(RechargeMonitorRunner.class);
    @Resource
    private RechargeMonitorServer monitorServer;
    @Value("${xxx.rechargeMonitor.url}")
    private  String rechargeMonitorUrl;
    @Value("${xxx.rechargeMonitor.monitorAccount}")
    private  String eosMonitorAccount;
    private ConcurrentHashMap requestMap= new ConcurrentHashMap();
    private String requestJson = "";
    @Autowired
    private OrderService orderService;
    @Autowired
    private ErrorAlarm errorAlarm;
    @Override
    public void run(ApplicationArguments args) throws Exception {
        SimpleDateFormat df = new SimpleDateFormat("HH:mm");//设置日期格式
        ThreadPoolTaskExecutor threadPoolTaskExecutor = initThreadPoolTaskExecutor();
        requestMap.put("code",eosMonitorAccount);
        requestMap.put("scope",eosMonitorAccount);
        requestMap.put("table","comein");
        requestMap.put("json",true);
        requestJson = JSON.toJSONString(requestMap);
        while (true) {
         //   System.out.println("Run start thread name->" + Thread.currentThread().getName());
          //List<Future> taskFutureList = new ArrayList<>();
           this.tradeMonitor();
            /*
          Future future = threadPoolTaskExecutor.submit(() -> {
                try {
                    this.tradeMonitor();
                } catch (ExecutionException | InterruptedException e) {
                    e.printStackTrace();
                }
            });

            taskFutureList.add(future);
            for (Future future2 : taskFutureList) {
               String result = (String) future2.get();
               System.out.println(result);
            }*/
            System.out.println("任务队列任务数量: " + threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size());

            //this.tradeMonitor();
            // monitorServer.tradeMonitor();
            String s = df.format(new Date());
            if ("23:50".equals(s)) {//当时间为23:50
                logger.info("当前时间为:" + df.format(new Date()) + "开启数据入库--------");
                break;//退出这个while(true)循环
            }
            // 获取任务队列中的任务数量

        }
    }

    @Async
    public Future<String> tradeMonitor() throws ExecutionException, InterruptedException {
       // System.out.println("Runner thread name->" + Thread.currentThread().getName());
       // System.out.println("\n\n----------------------------------------------");
        System.out.println(Thread.currentThread().getName() + "正在处理请求 tradeMonitor");
        System.out.println("----------------------------------------------");
        String result = "";
        result = OkHttpUtils.httpPostJson(rechargeMonitorUrl,requestJson);
        //result = null ;
        if(result==null  || result.equals("")){
            //报警
            ConcurrentHashMap alarmMap = new ConcurrentHashMap();
            alarmMap.put("api_url",rechargeMonitorUrl);
            alarmMap.put("params",requestJson);
            alarmMap.put("method","POST");
            alarmMap.put("message","xxx错误");
            int alarmFlag = errorAlarm.remoteRequestAlarm(alarmMap);
            result = "";
        }
        JSONObject jo = JSON.parseObject(result);
        CopyOnWriteArrayList<Order> orderIdList =  new CopyOnWriteArrayList<>();
        for(int i = 0;i<jo.getJSONArray("rows").size();i++){
            Order order = new Order();
            JSONObject oneRecord = (JSONObject)jo.getJSONArray("rows").get(i);
            String memo = oneRecord.getString("memo");
            String quantity = oneRecord.getString("quantity");
            if(memo.indexOf("orderID")>=0){
                String[] orderIdInfo = memo.split("\\:");
                Long orderID = Long.valueOf(orderIdInfo[1]);
                String[] amountInfo = quantity.split("\\s");
                BigDecimal amount = new BigDecimal(amountInfo[0]);
                if(amountInfo[1].equals("USDT") || amountInfo[1].equals("USDE")){
                    order.setId(orderID);
                    order.setHash(amount);
                }
                else {
                    logger.info("RechargeMonitor record orderID:"+orderID+"is  NOTUSDTORUSDE");
                }
                orderIdList.add(order);

            }
        }
        int affectRows = orderService.confirmOrder(orderIdList);
        if(affectRows!=orderIdList.size()){
            logger.info("RechargeMonitor orderInfo :"+JSON.toJSONString(orderIdList)+"is  ORDERCONFIRMINCOMPLETE");
        }
        //  Thread.sleep(2000);
       // monitorServer.tradeMonitor();
        //....你的业务逻辑
         return CompletableFuture.completedFuture(result);
       // return null;
    }

    private ThreadPoolTaskExecutor initThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setThreadNamePrefix("xxxTread-");
        threadPoolTaskExecutor.setCorePoolSize(5);
        threadPoolTaskExecutor.setMaxPoolSize(10);
        threadPoolTaskExecutor.setQueueCapacity(500);
        threadPoolTaskExecutor.setKeepAliveSeconds(60);
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                System.out.println("丢弃");
                logger.info("task full,reject,taskReject log");
            }
        });
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

}

一跑,飞快,还能把CPU和内存尽可能的使用,当然如果不想即时监控,使用定时任务每秒跑一次也是可以的

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-11-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 肉眼品世界 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云支付
云支付(Cloud Pay,CPay)为您提供开放、可靠的聚合收款技术服务和商户管理功能。云支付支持刷卡支付、扫码支付、一码多付多种支付方式。服务商也可使用云支付提供的 SDK 和 HTTPS 接口,将云支付集成进自己的系统中,为商户提供的个性化解决方案。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档