多线程设计模式解读6-single threaded Execution模式(附分布式环境下的操作)

Single Threaded Execution模式主要是用于确保同一时间内只能让一个线程执行处理,说通俗点就是对synchronized的标准化使用方式,这是比较基础的,所以我们前面重点介绍下如何保证同一个Jvm进程内的多线程同步,后面扩展开来,保证多个Jvm进程间多线程同步(分布式环境)。两者有很大的相似性。

单jvm进程下:

先看下一个简单的例子:

public class Ticket {

  private int counter = 100;

  public  void dec() {
    if(counter>0) {
      System.out.println(Thread.currentThread().getName() + "号窗口卖出:" + this.counter-- + "号票");
    }else{
      System.out.println("票已售完");
    }
  }

}


public class StationThread  extends Thread{
  Ticket ticket;

  public StationThread(Ticket ticket) {
    this.ticket = ticket;
  }
  @Override
  public void run() {
    while (true) {
      try{
        Thread.sleep(500);
      }catch(InterruptedException e){
        e.printStackTrace();
      }
      ticket.dec();
    }
  }

}



public class Main {

  public static void main(String[] args) {
    System.out.println("Testing...");
    Ticket ticket = new Ticket();
    new StationThread(ticket).start();
    new StationThread(ticket).start();
    new StationThread(ticket).start();
  }
}

这里打印结果可知,执行明显是错误的:

Testing...
Thread-0号窗口卖出:100号票
Thread-1号窗口卖出:99号票
Thread-2号窗口卖出:98号票
Thread-1号窗口卖出:97号票
Thread-2号窗口卖出:97号票
Thread-0号窗口卖出:97号票
Thread-2号窗口卖出:96号票
Thread-1号窗口卖出:96号票
Thread-0号窗口卖出:96号票
......

为什么会出错呢?因为Ticket不是线程安全的,this.counter--并不是一个原子性的操作,其中包含了读取,修改,写入,多个线程执行的时候,这些命令会交错执行,导致执行结果与预期不一致。

接下来,我们改下Ticket的方法:

public synchronized void dec() {
    if(counter>0) {
      System.out.println(Thread.currentThread().getName() + "号窗口卖出:" + this.counter-- + "号票");
    }else{
      System.out.println("票已售完");
    }
  }

添加了synchronized后,执行结果正常:

Testing...
Thread-0号窗口卖出:100号票
Thread-1号窗口卖出:99号票
Thread-2号窗口卖出:98号票
Thread-0号窗口卖出:97号票
Thread-1号窗口卖出:96号票
Thread-2号窗口卖出:95号票
Thread-0号窗口卖出:94号票
Thread-1号窗口卖出:93号票
Thread-2号窗口卖出:92号票
Thread-1号窗口卖出:91号票
Thread-2号窗口卖出:90号票
Thread-0号窗口卖出:89号票
Thread-1号窗口卖出:88号票
Thread-0号窗口卖出:87号票
Thread-2号窗口卖出:86号票
Thread-1号窗口卖出:85号票
Thread-2号窗口卖出:84号票
Thread-0号窗口卖出:83号票
Thread-1号窗口卖出:82号票
Thread-2号窗口卖出:81号票
Thread-0号窗口卖出:80号票
Thread-1号窗口卖出:79号票
Thread-2号窗口卖出:78号票
Thread-0号窗口卖出:77号票
......

synchronized保证了方法只能由一个线程,防止了由多个线程交错执行的情况。我们知道,编写线程安全的代码,核心在于对状态访问操作进行管理,特别是共享和可变的状态,这里Ticket就是一个共享资源(SharedResource),通过single thread execution模式,将非安全的方法声明为synchronized方法,确保同一时间只被一个线程访问。

使用时注意事项:

1、死锁问题:

死锁指多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞,因此程序不可能正常终止。就如同一座桥,只能容纳一辆车,有两辆车,相向而行,分到桥的中途,需要占据对方的空间才能通过。因此,就一直处于阻塞状态。

死锁主要是由于多个线程加锁顺序不一致导致的,可以从这里角度分析,防止死锁。

2、性能

获取锁花费时间,线程冲突会引起等待,为了提高性能,需要管理好锁的临界区,确定同步代码块的合理大小。

多jvm进程下:

分布式锁的方式:

一、通过如下命令在redis中实现分布式锁的功能

SET key value NX EX max-lock-time

其中NX是操作模式,表示只在键不存在时,才对键进行设置操作。

EX max-lock-time用于设置键的过期时间为max-lock-time秒。

这个命令连续两次执行结果如下:

test-redis:0>SET not-exists-key "value" NX EX 60
OK

test-redis:0>SET not-exists-key "value" NX EX 60
NULL

过六十秒后再执行:

test-redis:0>SET not-exists-key "value" NX EX 60
OK

这样,当一个线程执行命令成功,说明key原本不存在,该线程成功得到了锁;当设置失败时,说明key已经存在,该线程抢锁失败。

当到达过期时间,或者key被删除(del),说明锁被释放,其他线程可以继续执行这个命令来获取锁。

通过redis实现分布式锁有许多实现细节需要注意的:

1、很多人会通过setnx代替SET key value NX命令,但前者没有设置过期时间的参数,因此设置key值和设置过期时间便成为一个复合操作,不具备原子性,当一个线程设置了key值,但未设置过期时间,这时相关的节点挂了,但key一直存在,那其他线程就永远无法获取这个锁了(死锁)。

2、过期时间很难设置,如果设置短了,假设获得锁的A线程的任务还没执行完成,这时候锁就被释放了,其他线程就会获得锁,导致难以预料的一系列后果。如A线程执行完后误删了后一个线程的锁,共享数据被破坏等等。对此,我们可以通过开一个守护线程,当线程任务未执行完成,给锁续期。

二、zookeeper实现分布式锁

zookeeper分布式锁,实现更加完善,封装更好一点,因此,使用更加方便。

首先介绍下zookeeper分布式锁的实现原理。

为了构建这个锁,zookeeper创建一个持久的znode,它将作为父节点。试图获得锁的客户端将在父节点下面创建顺序的、临时的子节点。锁是由子节点具有最低的序列号的客户端进程拥有的。在图1中,锁节点有三个子节点,而节点1在这个时间点拥有锁,因为它的序列号是最低的。如果客户端创建的节点不是最小节点,就获得该节点的上一顺序节点,并给它注册watcher,同时在这里阻塞,等待监听事件的发生。当完成之后,关闭ZooKeeper连接,进而可以引发监听事件,释放该锁(在删除节点1之后,锁被释放), 然后拥有节点2的客户端拥有这个锁,以此类推。

zookeeper实现类似等待队列的机制,大大提升了抢锁的效率。

另外我们来看下,zookeeper有没有类似redis分布式锁那样的问题。我们发现它是不需要设置过期时间的,当任务完成时,客户端会删除节点,进而释放锁;当客户端挂掉,相应的临时会自动删除,锁被释放,其下一个序列的节点会收到通知,获取锁;当连接中断时则根据配置的重试机制重新连接。

Apache Curator,包含了对zookeeper分布式锁的实现,下面是使用代码,有兴趣可以研究下源码:

1、创建一个重试策略,然后使用CuratorFrameworkFactory.newClient()来获得CuratorFramework的实例

RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMills, maxRetries);

CuratorFramework client = CuratorFrameworkFactory.newClient(hosts, retryPolicy);

client.start();

2、为特定的锁路径(lockPath)创建一个进程互斥锁,获取锁,执行一些操作,然后释放锁。

InterProcessLock lock = new InterProcessMutex(client, lockPath);

if (lock.acquire(waitTimeSeconds, TimeUnit.SECONDS)) {

  try {

    // do work while we hold the lock

  } catch (Exception ex) {

    // handle exceptions as appropriate

  } finally {

    lock.release();

  }

} else {

  // we timed out waiting for lock, handle appropriately

}

3、不要忘记关闭client

client.close();

本文分享自微信公众号 - java达人(drjava)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-10-02

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏性能与架构

Redis的消息机制 - 发布订阅

发布订阅(pub/sub)是一种消息通信模式,主要目的是解除消息发布者、消息订阅者之间的耦合 pub/sub的特点 (1)时间非耦合 发布者和订阅者不必同时在线...

546120
来自专栏醒者呆

Go并发模式:管道与取消

关键字:Go语言,管道,取消机制,并发,sync.WaitGroup,包引用,通道,defer,select GO并发模式:管道与取消 简介 Go的并发能...

33560
来自专栏数据之美

玩转 SHELL 脚本之:Shell 命令 Buffer 知多少?

1、问题: 下午有同学问了这么一个问题: tail -n +$(tail -n1 /root/tmp/n) -F /root/tmp/ip.txt 2>...

51660
来自专栏FreeBuf

一名代码审计新手的实战经历与感悟

blueCMS介绍 个人认为,作为一个要入门代码审计的人,审计流程应该从简单到困难,逐步提升。因此我建议大家的审计流程为——DVWA——blueCMS——其他小...

44760
来自专栏应用案例

IT面试干货:PHP面试题汇总及答案

随着近两年来互联网潮流的发展,不少人选择php程序开发的学习。所以今天济南IT培训优就业的老师将与大家一起聊一聊PHP面试会问什么?、 ? PHP程序员经典面试...

42790
来自专栏IT技术精选文摘

如何设计一个 RPC 系统

24880
来自专栏美团技术团队

Android动态日志系统Holmes

背景 美团点评公司是全球领先的一站式生活服务平台,为6亿多消费者和超过450万优质商户提供连接线上线下的电子商务网络。美团点评的业务覆盖了超过200个丰富品类和...

728100
来自专栏编程

深入认识 vue-cli:能做的不仅仅是初始化 vue 工程

相信对于大部分使用过VueJS的同学来说, 是他们非常熟悉的一个工具。借助 ,我们通过非常简单的问答形式,方便地初始化一个vue工程,完全不需要担心繁复的web...

24880
来自专栏java一日一条

我的编码习惯 - 参数校验和国际化规范

今天我们说说参数校验和国际化,这些代码没有什么技术含量,却大量充斥在业务代码上,很可能业务代码只有几行,参数校验代码却有十几行,非常影响代码阅读,所以很有必要把...

11610
来自专栏韩伟的专栏

如何设计一个 RPC 系统

RPC 是一种方便的网络通信编程模型,由于和编程语言的高度结合,大大减少了处理网络数据的复杂度,让代码可读性也有可观的提高。本文就是通过分析几种流行的 RPC ...

10.9K90

扫码关注云+社区

领取腾讯云代金券