首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >多线程设计模式解读6-single threaded Execution模式(附分布式环境下的操作)

多线程设计模式解读6-single threaded Execution模式(附分布式环境下的操作)

作者头像
java达人
发布2018-10-25 11:34:17
6620
发布2018-10-25 11:34:17
举报
文章被收录于专栏:java达人java达人

Single Threaded Execution模式主要是用于确保同一时间内只能让一个线程执行处理,说通俗点就是对synchronized的标准化使用方式,这是比较基础的,所以我们前面重点介绍下如何保证同一个Jvm进程内的多线程同步,后面扩展开来,保证多个Jvm进程间多线程同步(分布式环境)。两者有很大的相似性。

单jvm进程下:

先看下一个简单的例子:

public class Ticket {

  private int counter = 100;

  public  void dec() {
    if(counter>0) {
      System.out.println(Thread.currentThread().getName() + "号窗口卖出:" + this.counter-- + "号票");
    }else{
      System.out.println("票已售完");
    }
  }

}


public class StationThread  extends Thread{
  Ticket ticket;

  public StationThread(Ticket ticket) {
    this.ticket = ticket;
  }
  @Override
  public void run() {
    while (true) {
      try{
        Thread.sleep(500);
      }catch(InterruptedException e){
        e.printStackTrace();
      }
      ticket.dec();
    }
  }

}



public class Main {

  public static void main(String[] args) {
    System.out.println("Testing...");
    Ticket ticket = new Ticket();
    new StationThread(ticket).start();
    new StationThread(ticket).start();
    new StationThread(ticket).start();
  }
}

这里打印结果可知,执行明显是错误的:

Testing...
Thread-0号窗口卖出:100号票
Thread-1号窗口卖出:99号票
Thread-2号窗口卖出:98号票
Thread-1号窗口卖出:97号票
Thread-2号窗口卖出:97号票
Thread-0号窗口卖出:97号票
Thread-2号窗口卖出:96号票
Thread-1号窗口卖出:96号票
Thread-0号窗口卖出:96号票
......

为什么会出错呢?因为Ticket不是线程安全的,this.counter--并不是一个原子性的操作,其中包含了读取,修改,写入,多个线程执行的时候,这些命令会交错执行,导致执行结果与预期不一致。

接下来,我们改下Ticket的方法:

public synchronized void dec() {
    if(counter>0) {
      System.out.println(Thread.currentThread().getName() + "号窗口卖出:" + this.counter-- + "号票");
    }else{
      System.out.println("票已售完");
    }
  }

添加了synchronized后,执行结果正常:

Testing...
Thread-0号窗口卖出:100号票
Thread-1号窗口卖出:99号票
Thread-2号窗口卖出:98号票
Thread-0号窗口卖出:97号票
Thread-1号窗口卖出:96号票
Thread-2号窗口卖出:95号票
Thread-0号窗口卖出:94号票
Thread-1号窗口卖出:93号票
Thread-2号窗口卖出:92号票
Thread-1号窗口卖出:91号票
Thread-2号窗口卖出:90号票
Thread-0号窗口卖出:89号票
Thread-1号窗口卖出:88号票
Thread-0号窗口卖出:87号票
Thread-2号窗口卖出:86号票
Thread-1号窗口卖出:85号票
Thread-2号窗口卖出:84号票
Thread-0号窗口卖出:83号票
Thread-1号窗口卖出:82号票
Thread-2号窗口卖出:81号票
Thread-0号窗口卖出:80号票
Thread-1号窗口卖出:79号票
Thread-2号窗口卖出:78号票
Thread-0号窗口卖出:77号票
......

synchronized保证了方法只能由一个线程,防止了由多个线程交错执行的情况。我们知道,编写线程安全的代码,核心在于对状态访问操作进行管理,特别是共享和可变的状态,这里Ticket就是一个共享资源(SharedResource),通过single thread execution模式,将非安全的方法声明为synchronized方法,确保同一时间只被一个线程访问。

使用时注意事项:

1、死锁问题:

死锁指多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放。由于线程被无限期地阻塞,因此程序不可能正常终止。就如同一座桥,只能容纳一辆车,有两辆车,相向而行,分到桥的中途,需要占据对方的空间才能通过。因此,就一直处于阻塞状态。

死锁主要是由于多个线程加锁顺序不一致导致的,可以从这里角度分析,防止死锁。

2、性能

获取锁花费时间,线程冲突会引起等待,为了提高性能,需要管理好锁的临界区,确定同步代码块的合理大小。

多jvm进程下:

分布式锁的方式:

一、通过如下命令在redis中实现分布式锁的功能

SET key value NX EX max-lock-time

其中NX是操作模式,表示只在键不存在时,才对键进行设置操作。

EX max-lock-time用于设置键的过期时间为max-lock-time秒。

这个命令连续两次执行结果如下:

test-redis:0>SET not-exists-key "value" NX EX 60
OK

test-redis:0>SET not-exists-key "value" NX EX 60
NULL

过六十秒后再执行:

test-redis:0>SET not-exists-key "value" NX EX 60
OK

这样,当一个线程执行命令成功,说明key原本不存在,该线程成功得到了锁;当设置失败时,说明key已经存在,该线程抢锁失败。

当到达过期时间,或者key被删除(del),说明锁被释放,其他线程可以继续执行这个命令来获取锁。

通过redis实现分布式锁有许多实现细节需要注意的:

1、很多人会通过setnx代替SET key value NX命令,但前者没有设置过期时间的参数,因此设置key值和设置过期时间便成为一个复合操作,不具备原子性,当一个线程设置了key值,但未设置过期时间,这时相关的节点挂了,但key一直存在,那其他线程就永远无法获取这个锁了(死锁)。

2、过期时间很难设置,如果设置短了,假设获得锁的A线程的任务还没执行完成,这时候锁就被释放了,其他线程就会获得锁,导致难以预料的一系列后果。如A线程执行完后误删了后一个线程的锁,共享数据被破坏等等。对此,我们可以通过开一个守护线程,当线程任务未执行完成,给锁续期。

二、zookeeper实现分布式锁

zookeeper分布式锁,实现更加完善,封装更好一点,因此,使用更加方便。

首先介绍下zookeeper分布式锁的实现原理。

为了构建这个锁,zookeeper创建一个持久的znode,它将作为父节点。试图获得锁的客户端将在父节点下面创建顺序的、临时的子节点。锁是由子节点具有最低的序列号的客户端进程拥有的。在图1中,锁节点有三个子节点,而节点1在这个时间点拥有锁,因为它的序列号是最低的。如果客户端创建的节点不是最小节点,就获得该节点的上一顺序节点,并给它注册watcher,同时在这里阻塞,等待监听事件的发生。当完成之后,关闭ZooKeeper连接,进而可以引发监听事件,释放该锁(在删除节点1之后,锁被释放), 然后拥有节点2的客户端拥有这个锁,以此类推。

zookeeper实现类似等待队列的机制,大大提升了抢锁的效率。

另外我们来看下,zookeeper有没有类似redis分布式锁那样的问题。我们发现它是不需要设置过期时间的,当任务完成时,客户端会删除节点,进而释放锁;当客户端挂掉,相应的临时会自动删除,锁被释放,其下一个序列的节点会收到通知,获取锁;当连接中断时则根据配置的重试机制重新连接。

Apache Curator,包含了对zookeeper分布式锁的实现,下面是使用代码,有兴趣可以研究下源码:

1、创建一个重试策略,然后使用CuratorFrameworkFactory.newClient()来获得CuratorFramework的实例

RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMills, maxRetries);

CuratorFramework client = CuratorFrameworkFactory.newClient(hosts, retryPolicy);

client.start();

2、为特定的锁路径(lockPath)创建一个进程互斥锁,获取锁,执行一些操作,然后释放锁。

InterProcessLock lock = new InterProcessMutex(client, lockPath);

if (lock.acquire(waitTimeSeconds, TimeUnit.SECONDS)) {

  try {

    // do work while we hold the lock

  } catch (Exception ex) {

    // handle exceptions as appropriate

  } finally {

    lock.release();

  }

} else {

  // we timed out waiting for lock, handle appropriately

}

3、不要忘记关闭client

client.close();
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-10-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 java达人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档