前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >仿写@ScheduleLock 定时任务

仿写@ScheduleLock 定时任务

作者头像
分享干货的你
发布2021-05-11 10:34:51
6820
发布2021-05-11 10:34:51
举报
文章被收录于专栏:分享干货的你分享干货的你

最近公司在搞分布式的定时任务, 怎么满足分布式的定时任务锁。 我看了大量的开源的代码。 https://github.com/lukas-krecan/ShedLock 感觉老外写的非常的不错。

其实底层也就是分布式锁+aop 的切片来实现的。那既然别人也能实现。 那我们也可以的。 这里分布式锁我们使用zookeeper 来实现。具体的客户端我使用zookeeper 的curator 来实现。 官网地址 http://curator.apache.org/getting-started.html , 写的比较详细,清晰。 分布式锁也有好几种形式。 下面我们就来写代码实现一下。

  1. 首先安装zookeeper

2. 我们看到了是一个空的客户端, 现在已经连接上了。

3, 新建一个maven Springboot工程 导入curator 的客户端pom

4. 编写客户端的配置类

代码语言:javascript
复制
@Component
public class ZkClientConfig {

     // 从配置文件拿取zk 的连接
     @Value("${zk.url}")
     private String zkConnection;

     private static CuratorFramework client;

     @PostConstruct
     public void startClient(){
         // 100 毫秒从新连接三次。
         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
         client = CuratorFrameworkFactory.newClient(zkConnection, retryPolicy);
         client.start();
     }

}

要是看见出现这种,就说明连接成功了。

在编写可重入锁的service

代码语言:javascript
复制
package com.example.demo.service;

import com.example.demo.config.ZkClientConfig;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;

import java.util.concurrent.TimeUnit;

@Service
public class ZkClientService {


    private static  InterProcessMutex lock;

    /**
     * 尝试获取锁
     * @param time
     * @param timeUnit
     * @return
     */
    public boolean lock(long time , TimeUnit timeUnit,String lockPath){
          try {
              lock = new InterProcessMutex(ZkClientConfig.client, lockPath);

           return    lock.acquire(time,timeUnit);
          }catch (Exception e){
              return  false;
          }
    }

    /**
     *  创建父节点和子节点
     * @param lockPath
     * @throws Exception
     */
    public void create(String lockPath) throws Exception {
        ZkClientConfig.client.create().forPath(lockPath,"test".getBytes());

    }

   /**/ 遍历路径下面的节点
    public int SizeForPath(String path) throws Exception {
       return ZkClientConfig.client.getChildren().forPath(path).size();
    }

    /**/ 判断路径是否存在
    public Boolean isExist(String path) throws Exception {
        return !ObjectUtils.isEmpty(ZkClientConfig.client.checkExists().forPath(path));
    }



    /**
     * 尝试获取锁
     * @return
     */
    public void  lockNotTime(String lockPath){
        try {
            lock = new InterProcessMutex(ZkClientConfig.client, lockPath);
            lock.acquire();
        }catch (Exception e){
            e.printStackTrace();
        }
    }


    /**
     *  释放锁
     */
    public  void releaseLock(){
        try {
            lock.release();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

启动项目,调用初始化lock 的方法 看一下zk 的界面

但是这个节点释放锁就会消失, 关闭zk 也会消失。说明是临时的。 根据后缀来看应该是顺序的节点。

好了, 下面我们就来自定义注解了。

代码语言:javascript
复制
@Documented
@Retention(RUNTIME)
@Target(METHOD)
public @interface DCScheduleLock {

    /**
     *  定时任务的名称,其实也就是zk的节点,具有唯一性,不能重复。
     * @return
     */
    String name() ;

    /**
     *  锁的时长
     * @return
     */
    long time();

    /**
     * 锁时长单位
     * @return
     */
    TimeUnit unit();

}

在写一下AOP切片

代码语言:javascript
复制
@Component
@Aspect
public class SchedulelockAop {

    // path 路径前缀
    private final static String PATH_PREFIX = "/";

    // 切片的注解
    @Pointcut("@annotation(com.example.demo.annotation.DCScheduleLock)")
    public void dcScheduleLock() {

    }

    @Autowired
    private ZkClientService zkClientService;

   //阻塞多长时间
    private static ThreadLocal<Long> threadLocalLong = new ThreadLocal<>();

    // 判断单个还是多个实列, 单个实例不走延迟
    private static ThreadLocal<Boolean> single = new ThreadLocal<>();



    @Around( value = "dcScheduleLock()")
    public Object lockAround(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature)joinPoint.getSignature();
        Method method = signature.getMethod();
        DCScheduleLock dcScheduleLock = AnnotationUtils.findAnnotation(method, DCScheduleLock.class);
        Scheduled scheduled = AnnotationUtils.findAnnotation(method, Scheduled.class);
        if(!ObjectUtils.isEmpty(scheduled)) {
           // 判断是否存在根路径,不存在就创建
            if(!zkClientService.isExist(PATH_PREFIX+method+dcScheduleLock.name())){
                zkClientService.create(PATH_PREFIX+method+dcScheduleLock.name());
            }
            //  判断当前根路径,也就是子路径的个数
            int size =  zkClientService.SizeForPath(PATH_PREFIX+method+dcScheduleLock.name());
            // 判断是单个实列还是多个
            if(size<2){
                if(!zkClientService.isExist(PATH_PREFIX+method+dcScheduleLock.name()+PATH_PREFIX+String.valueOf(Thread.currentThread().getId()))){
                    zkClientService.create(PATH_PREFIX+method+dcScheduleLock.name()+PATH_PREFIX+String.valueOf(Thread.currentThread().getId()));
                }
                single.set(true);
            }else {
                single.set(false);
            }
            System.out.println("实例的个数: "+size);
            Object obj = null;
            System.out.println(" 线程id "+ Thread.currentThread().getId());
            threadLocalLong.set(timeToLong(dcScheduleLock.unit(),dcScheduleLock.time(),scheduled.cron()));
            if(zkClientService.lock(dcScheduleLock.time(),dcScheduleLock.unit() ,PATH_PREFIX+ dcScheduleLock.name())){
                 // 抢到锁就执行方法,没有就不做任何处理。
                 obj = joinPoint.proceed();
            }
            return  obj;
             }else {
            System.out.println( Thread.currentThread().getName()+ "线程没有获取锁 ");
            return joinPoint.proceed();
        }
    }
  
    // 后置处理器,获取阻塞的时间才能释放锁
    @After(value = "dcScheduleLock()")
    public void after(JoinPoint joinPoint) throws InterruptedException {
        // 单室例节点不阻塞
        if(!single.get()){
            Thread.sleep(threadLocalLong.get());
            threadLocalLong.remove();
        }
       //切记一定要释放锁。 
        zkClientService.releaseLock();
        System.out.println( Thread.currentThread().getName()+ "线程释放锁成功 ");

    }


    /**
     *  这里先写死就是五秒钟, 其实是拿到cron  和 所得时间比较, 取最大的时间
     * @param timeUnit
     * @param time
     * @return
     */
    public  long timeToLong(TimeUnit timeUnit,long time ,String cron){
        return 1000*5;
    }
}

在写一下测试代码:

代码语言:javascript
复制
  @Scheduled(cron = "0/5 * * * * ?")
  @DCScheduleLock(name = "test",time =2,unit = TimeUnit.SECONDS)
  public void   test(){
    System.out.println(" current  thread --->>>>"+ new Date());
  }

单个机器测试:

没问题,两台机器测试我们看看啊。

这里的实列也是用节点来标识的,路径是schedulLock 的name 和方法名称。

看一下完美解决了, 一共两个实列。

后面还没写完,实列不应该是持久化的节点,也应该是临时的。

最后看一下zk

里面存放的使我们两个实列。

具体流程图

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-05-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 分享干货的你 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档