分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。
数据库的悲观锁和乐观锁也能保证不同主机共享数据的一致性。但是却存在以下问题:
上一篇讲述了使用Redis分布式锁的开源项目redisson做分布式锁的简单实现。这一篇讲述使用Zookeeper做分布式锁。
代码可以在SpringBoot组件化构建https://www.pomit.cn/java/spring/springcloud.html中的LockZookeeper、LockSupport和LockTest组件中查看,并下载。LockSupport和LockTest是配合LockZookeeper的测试项目。
**如果大家正在寻找一个java的学习环境,或者在开发中遇到困难,可以<a
href="https://jq.qq.com/?_wv=1027&k=52sgH1J"
target="_blank">
加入我们的java学习圈,点击即可加入
</a>
,共同学习,节约学习时间,减少很多在学习中遇到的难题。**
首先是必然要安装LockZookeeper的。
基于zookeeper临时有序节点可以实现的分布式锁。大致思想即为:每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点。 判断是否获取锁的方式很简单,只需要判断有序节点中序号最小的一个。 当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题。
LockSupport代码可以在SpringBoot组件化构建https://www.pomit.cn/java/spring/springcloud.html中的LockSupport组件中下载,这里就只说LockSupport的功能。
LockSupport只是个简单的SpringBoot项目,使用Spring Data Jpa做数据库操作,开放以下接口做服务:
LockZookeeper代码可以在SpringBoot组件化构建https://www.pomit.cn/java/spring/springcloud.html中的LockZookeeper组件中下载。
第三章节将详细介绍LockZookeeper项目。
LockTest代码可以在SpringBoot组件化构建https://www.pomit.cn/java/spring/springcloud.html中的LockTest组件中下载,这里就只说LockSupport的功能。
LockTest只是个简单的SpringBoot项目,使用Feign请求LockZookeeper来测试分布式锁的使用。
下面详细介绍LockZookeeper是分布式锁的。
需要引入数据库相关jar、jpa、spring-integration-zookeeper、zookeeper;
因为使用了consul做服务注册发现,需要引入spring-cloud-starter-consul-discovery和spring-cloud-starter-openfeign。
依赖如下:
<?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.pomit</groupId>
<artifactId>springcloudwork</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>LockZookeeper</artifactId>
<name>LockZookeeper</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<maven-jar-plugin.version>2.6</maven-jar-plugin.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zookeeper</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.34</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
</dependencies>
</project>
父模块pom文件可以在https://www.pomit.cn/spring/SpringCloudWork/pom.xml获取。
这里使用yaml文件写配置,配置文件application.yml:
application.yml:
server:
port: 8038
useLock: true
spring:
application:
name: lockService
cloud:
consul:
host: 127.0.0.1
port: 8500
discovery:
prefer-ip-address: true
healthCheckPath: /consul/health
datasource:
type: org.apache.commons.dbcp2.BasicDataSource
dbcp2:
max-wait-millis: 60000
min-idle: 20
initial-size: 2
connection-properties: characterEncoding=utf8
validation-query: SELECT 1
test-while-idle: true
test-on-borrow: true
test-on-return: false
driverClassName: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/boot?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC
username: cff
password: 123456
logging:
level:
root: INFO
zookeeper:
connectionString: localhost:2181
这里,应用名称是lockService,在8018端口监听。
这里使用@FeignClient来请求lockSupport项目,获取商品信息或者消费商品。
GoodsInfoService :
package cn.pomit.springbootwork.lock.zk.inter;
import java.util.List;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import cn.pomit.springbootwork.lock.zk.domain.TGoodInfo;
@FeignClient("lockSupport")
public interface GoodsInfoService {
@RequestMapping(method = RequestMethod.GET, value = "/goods/take", consumes = "application/json")
public TGoodInfo getGoodsInfo(@RequestParam("goodId") Integer goodId);
@RequestMapping(method = RequestMethod.GET, value = "/goods/page", consumes = "application/json")
public List<TGoodInfo> getGoodsList(@RequestParam("page") Integer page, @RequestParam("size") Integer size);
@RequestMapping(method = RequestMethod.GET, value = "/goods/consume", consumes = "application/json")
public Integer consume(@RequestParam("goodId") Integer goodId, @RequestParam("num") Integer num, @RequestParam("serverId") Integer serverId);
}
消费的业务逻辑只是简单的处理商品,并调用GoodsInfoService 进行商品信息服务请求。
UserConsumeService:
package cn.pomit.springbootwork.lock.zk.service;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import cn.pomit.springbootwork.lock.zk.dao.TUserGoodDao;
import cn.pomit.springbootwork.lock.zk.domain.TGoodInfo;
import cn.pomit.springbootwork.lock.zk.domain.TUserGood;
import cn.pomit.springbootwork.lock.zk.dto.ResultModel;
import cn.pomit.springbootwork.lock.zk.inter.ConsumeService;
import cn.pomit.springbootwork.lock.zk.inter.GoodsInfoService;
@Service("UserConsumeService")
public class UserConsumeService implements ConsumeService {
protected Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
GoodsInfoService goodsInfoService;
@Autowired
TUserGoodDao tUserGoodDao;
@Value("${server.port}")
private Integer serverId;
public ResultModel goodList(Integer page, Integer size) {
List<TGoodInfo> pageList = goodsInfoService.getGoodsList(page, size);
return ResultModel.ok(pageList);
}
public ResultModel goodDetail(Integer goodId) {
return ResultModel.ok(goodsInfoService.getGoodsInfo(goodId));
}
public ResultModel consume(Integer goodId, Integer num, String userName) {
log.info("开始消费:userName:{};goodId:{},num:{}", userName, goodId, num);
TGoodInfo tGoodInfo = goodsInfoService.getGoodsInfo(goodId);
log.info("消费前:goodId:{}剩余:{}", goodId, tGoodInfo.getGoodNum());
Integer ret = goodsInfoService.consume(goodId, num, serverId);
log.info("消费结果:ret:{};", ret);
if (ret > 0) {
log.info("保存用户消费信息");
TUserGood tUserGood = new TUserGood();
tUserGood.setConsumeNum(num);
tUserGood.setGoodId(goodId);
tUserGood.setUserName(userName);
tUserGoodDao.save(tUserGood);
return ResultModel.ok();
}
return ResultModel.error("消费失败!");
}
}
同时,这个业务逻辑实现了ConsumeService接口:
ConsumeService:
package cn.pomit.springbootwork.lock.zk.inter;
import cn.pomit.springbootwork.lock.zk.dto.ResultModel;
public interface ConsumeService {
public ResultModel goodList(Integer page, Integer size);
public ResultModel goodDetail(Integer goodId);
public ResultModel consume(Integer goodId, Integer num, String userName);
}
实现这个接口,是为了使用代理模式。
DistributedLocker类实现基于zookeeper的分布式锁。
Lock lock = zookeeperLockRegistry.obtain(LOCKER_GOODS_CONSUME);
;lock.tryLock
成功获取到锁之后,调用LockWorker的invoke方法,执行传入的LockWorker对象。lock.unlock()
释放锁。DistributedLocker :
package cn.pomit.springbootwork.lock.zk.lock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.integration.zookeeper.lock.ZookeeperLockRegistry;
import cn.pomit.springbootwork.lock.zk.dto.ResultModel;
import cn.pomit.springbootwork.lock.zk.inter.LockWorker;
public class DistributedLocker {
protected Logger log = LoggerFactory.getLogger(this.getClass());
public final static String LOCKER_PREFIX = "lock:";
public final static String LOCKER_GOODS_CONSUME = "GOODS_CONSUME";
public final static Long LOCKER_WAITE_TIME = 10l;
public final static Long LOCKER_LOCK_TIME = 100l;
@Value("${server.port}")
private Integer serverId;
@Value("${server.useLock}")
private boolean useLock = true;
/**
* The lock client.
*/
@Autowired
protected ZookeeperLockRegistry zookeeperLockRegistry;
public ResultModel tryLock(LockWorker lockWorker) {
try {
if (useLock) {
log.info("当前server 节点为:{}", serverId);
log.info("开始获取lock key:{}", LOCKER_PREFIX + LOCKER_GOODS_CONSUME);
Lock lock = zookeeperLockRegistry.obtain(LOCKER_GOODS_CONSUME);
log.info("创建lock key:{},尝试lock", LOCKER_PREFIX + LOCKER_GOODS_CONSUME);
// (公平锁)最多等待10秒,锁定后经过lockTime秒后自动解锁
boolean success = lock.tryLock(10, TimeUnit.SECONDS);
if (success) {
try {
log.info("成功到获取lock key:{}", LOCKER_PREFIX + LOCKER_GOODS_CONSUME);
return lockWorker.invoke();
} finally {
log.info("释放lock key:{}", LOCKER_PREFIX + LOCKER_GOODS_CONSUME);
lock.unlock();
}
}
log.info("获取lock key:{}失败", LOCKER_PREFIX + LOCKER_GOODS_CONSUME);
return ResultModel.error("获取分布式锁失败!");
} else {
log.info("当前server 节点为:{}", serverId);
log.info("当前server 没有使用分布式锁:{}");
return lockWorker.invoke();
}
} catch (Exception e) {
log.error("获取lock key异常", e);
return ResultModel.error("获取分布式锁过程异常!");
}
}
public Integer getServerId() {
return serverId;
}
public void setServerId(Integer serverId) {
this.serverId = serverId;
}
}
为了避免对代码侵入性太强,我这里使用代理模式,使用LockWorker来操作消费业务。这样,如果想去掉分布式锁的代码,只需注掉代理即可。
UserConsumeServiceProxy:
package cn.pomit.springbootwork.lock.zk.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import cn.pomit.springbootwork.lock.zk.dto.ResultModel;
import cn.pomit.springbootwork.lock.zk.inter.ConsumeService;
import cn.pomit.springbootwork.lock.zk.inter.LockWorker;
import cn.pomit.springbootwork.lock.zk.lock.DistributedLocker;
@Service("consumeService")
public class UserConsumeServiceProxy extends DistributedLocker implements ConsumeService {
@Autowired
UserConsumeService userConsumeService;
public ResultModel goodList(Integer page, Integer size) {
return userConsumeService.goodList(page, size);
}
public ResultModel goodDetail(Integer goodId) {
return userConsumeService.goodDetail(goodId);
}
public ResultModel consume(Integer goodId, Integer num, String userName) {
LockWorker lockWorker = new LockWorker() {
@Override
public ResultModel invoke() throws Exception {
log.info("开始远程消费:userName:{};goodId:{},num:{}", userName, goodId, num);
return userConsumeService.consume(goodId, num, userName);
}
};
return tryLock(lockWorker);
}
}
LockWorker 接口:
package cn.pomit.springbootwork.lock.zk.inter;
import cn.pomit.springbootwork.lock.zk.dto.ResultModel;
public interface LockWorker {
ResultModel invoke() throws Exception;
}
开放web接口,给模拟的客户端LockTest测试使用。
LockZkRest:
package cn.pomit.springbootwork.lock.zk.web;
import java.security.Principal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import cn.pomit.springbootwork.lock.zk.dto.ResultModel;
import cn.pomit.springbootwork.lock.zk.inter.ConsumeService;
@RestController
@RequestMapping("/lock")
public class LockZkRest {
private Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
ConsumeService consumeService;
@RequestMapping(value = "/goodList")
public ResultModel goodList(@RequestParam(value = "page", required = false) Integer page,
@RequestParam(value = "size", required = false) Integer size) {
return consumeService.goodList(page, size);
}
@RequestMapping(value = "/goodDetail")
public ResultModel goodDetail(@RequestParam("goodId") Integer goodId) {
return consumeService.goodDetail(goodId);
}
@RequestMapping(value = "/consume")
public ResultModel consume(@RequestParam("goodId") Integer goodId, @RequestParam("num") Integer num,
Principal principal) {
String userName = "test";
if (principal != null && !StringUtils.isEmpty(principal.getName())) {
userName = principal.getName();
}
log.info("收到消费请求:userName:{};goodId:{}", userName, goodId);
return consumeService.consume(goodId, num, userName);
}
}
spring-integration-zookeeper,需要声明zookeeper的客户端工具CuratorFramework,及ZookeeperLockRegistry分布式锁管理中心。
LockZkApplication:
package cn.pomit.springbootwork.lock.zk;
import org.apache.curator.framework.CuratorFramework;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.zookeeper.config.CuratorFrameworkFactoryBean;
import org.springframework.integration.zookeeper.lock.ZookeeperLockRegistry;
import org.springframework.web.client.RestTemplate;
@EnableFeignClients
@EnableDiscoveryClient
@SpringBootApplication
public class LockZkApplication {
@Value("${zookeeper.connectionString}")
private String connectionString;
public static void main(String[] args) {
SpringApplication.run(LockZkApplication.class, args);
}
@Bean
@LoadBalanced
RestTemplate restTemplate() {
return new RestTemplate();
}
@Bean
CuratorFrameworkFactoryBean curatorFramework() {
return new CuratorFrameworkFactoryBean(connectionString);
}
@Bean
ZookeeperLockRegistry zookeeperLockRegistry(CuratorFramework client) {
return new ZookeeperLockRegistry(client);
}
}
package cn.pomit.springbootwork.lock.zk.dao;
import org.springframework.data.jpa.repository.JpaRepository;
import cn.pomit.springbootwork.lock.zk.domain.TUserGood;
public interface TUserGoodDao extends JpaRepository<TUserGood, Integer> {
}
这个是使用consul做注册中心才需要的。
HealthWeb:
package cn.pomit.springbootwork.lock.zk.web;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/consul")
public class HealthWeb {
@RequestMapping(value = "health", method = { RequestMethod.GET })
public String health() {
return "check health";
}
}
TUserGood:
package cn.pomit.springbootwork.lock.zk.domain;
import javax.persistence.Table;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Column;
@Entity
@Table(name = "t_user_good")
public class TUserGood {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private int id;
@Column(name = "good_id")
private int goodId;
@Column(name = "consume_num")
private int consumeNum;
@Column(name = "user_name")
private String userName;
public void setId(int id) {
this.id = id;
}
public int getId() {
return id;
}
public void setGoodId(int goodId) {
this.goodId = goodId;
}
public int getGoodId() {
return goodId;
}
public void setConsumeNum(int consumeNum) {
this.consumeNum = consumeNum;
}
public int getConsumeNum() {
return consumeNum;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getUserName() {
return userName;
}
}
TGoodInfo:
package cn.pomit.springbootwork.lock.zk.domain;
import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;
public class TGoodInfo {
private Integer id;
private String goodName;
private String goodDesc;
@JsonFormat(pattern = "yyyy-MM-dd", timezone = "GMT+8")
private Date createTime;
private int goodNum;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public void setGoodName(String goodName) {
this.goodName = goodName;
}
public String getGoodName() {
return goodName;
}
public void setGoodDesc(String goodDesc) {
this.goodDesc = goodDesc;
}
public String getGoodDesc() {
return goodDesc;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public Date getCreateTime() {
return createTime;
}
public int getGoodNum() {
return goodNum;
}
public void setGoodNum(int goodNum) {
this.goodNum = goodNum;
}
}
ResultModel:
package cn.pomit.springbootwork.lock.zk.dto;
/**
* @author cff
*/
public class ResultModel {
private String errorCode;
private String message;
private Object data;
public ResultModel() {
}
public ResultModel(String errorCode, String message) {
this.errorCode = errorCode;
this.message = message;
}
public ResultModel(String errorCode, String message, Object data) {
this.errorCode = errorCode;
this.message = message;
this.data = data;
}
public String geterrorCode() {
return errorCode;
}
public void seterrorCode(String errorCode) {
this.errorCode = errorCode;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}
public static ResultModel ok(String testValue) {
ResultModel rm = new ResultModel();
rm.setData(testValue);
return rm;
}
}