这些场景包括;Redis 的基本缓存使用、Redis 加锁(Redisson 提供了很多锁的方式,这里我们会展示独占锁和无锁化的性能测试)。之后还有一个非常重要的场景是关于 Redis 的发布和订阅。
本节内容会涉及到结合 Spring 框架进行自定义 Bean 对象的注入容器操作,以满足尽可能减少配置的情况下,完成对象的实例化和注入使用。这样的操作非常具有高级编码的实战性,值得大家折腾一下,也能顺便补充 Spring 源码的运用。
本文涉及的工程:
docs/dev-ops 提供了 mysql、redis 安装脚本,和数据初始化操作
开源内存数据存储,被数百万开发人员用作数据库、缓存、流引擎和消息代理。
在互联网应用开发中,Redis 缓存的使用,大部分都是为了保护数据库的。让应用对于非必要的情况下,尽可能减少对数据库的调用。比如一份固定的数据可以放到 Redis 缓存中提供查询,或者需要数据库唯一索引仿重拦截 insert 操作先进行 Redis 布隆过滤器校验,也或者是分布式场景下的加锁处理。这样可以减少了对数据库资源的占用,也提供了接口的响应性能。
同时也还有一些专门针对 Redis 做的技术方案,来提高系统的响应吞吐量和响应性能。如;基于 Redis 内存存储实现的规则引擎、基于 Redis 队列实现的低延迟任务调度、基于 Redis 发布和订阅实现的流程解耦操作等等,都是互联网需求场景中非常常用的技术方案。那么本节小傅哥会模拟出一个订单下单场景,来使用 Redis 缓存、加锁、发布/订阅等功能,为大家展示 Redis 的使用。
在安装执行 docker-compose.yml 脚本之前,你需要先在本地安装 docker 之后 IntelliJ IDEA 打开 docker-compose.yml 文件,如图操作即可安装。
接下来小傅哥会带着大家在模拟的订单场景中,把 Redis 的缓存、加锁、发布/订阅的相关功能依次实现下。
在 app 模块下的 config 中,创建 RedisClientConfigProperties 配置类和 RedisClientConfig 客户端启动类。用于通过 Redisson 创建 Redis 的连接客户端。
redis:
sdk:
config:
host: localhost
port: 6379
password: 123456
pool-size: 10
min-idle-size: 5
idle-timeout: 30000
connect-timeout: 5000
retry-attempts: 3
retry-interval: 1000
ping-interval: 60000
keep-alive: true
Redis 的大部分操作其实都是缓存数据,提高系统的 QPS,在插入、更新、删除(逻辑删)、查询的时候,依赖于 Redis 进行提速操作。
// 设置到缓存,在创建订单完成后写入缓存
redissonService.setValue(orderId, orderEntity);
@Override
public OrderEntity queryOrder(String orderId) {
OrderEntity orderEntity = redissonService.getValue(orderId);
if (null == orderEntity) {
UserOrderPO userOrderPO = userOrderDao.selectByOrderId(orderId);
orderEntity = new OrderEntity();
orderEntity.setUserName(userOrderPO.getUserName());
orderEntity.setUserId(userOrderPO.getUserId());
// 设置到缓存
redissonService.setValue(orderId, orderEntity);
}
return orderEntity;
}
使用 Redis 加分布式锁,也是分布式架构设计中非常常用的手段。常用于的场景包括;流程较长,耗时较多的个人开户、下单行为。也包括;一些资源竞争时加分布式锁,排队处理请求。但对于资源竞争的这类库存占用,如果加分布式锁是非常影响系统的吞吐量的,因为所有的用户都在等待上一个用户做完流程后释放锁的处理,相当于你即使系统是分布式的了,但这里的分布式锁依然会把性能拖慢。所以如图,我们要考虑两种场景不同的加锁方式。
/** 独占锁 */
@Override
public String createOrderByLock(OrderAggregate orderAggregate) {
RLock lock = redissonService.getLock("create_order_lock_".concat(orderAggregate.getSkuEntity().getSku()));
try {
lock.lock();
long decrCount = redissonService.decr(orderAggregate.getSkuEntity().getSku());
if (decrCount < 0) return "已无库存[初始化的库存和使用库存,保持一致。orderService.initSkuCount(\"13811216\", 10000);]";
return createOrder(orderAggregate);
} finally {
lock.unlock();
}
}
/** 分段锁,也可以称为无锁化 */
@Override
public String createOrderByNoLock(OrderAggregate orderAggregate) {
UserEntity userEntity = orderAggregate.getUserEntity();
SKUEntity skuEntity = orderAggregate.getSkuEntity();
// 模拟锁商品库存
long decrCount = redissonService.decr(skuEntity.getSku());
if (decrCount < 0) return "已无库存[初始化的库存和使用库存,保持一致。orderService.initSkuCount(\"13811216\", 10000);]";
String lockKey = userEntity.getUserId().concat("_").concat(String.valueOf(decrCount));
RLock lock = redissonService.getLock(lockKey);
try {
lock.lock();
return createOrder(orderAggregate);
} finally {
lock.unlock();
}
}
此场景的案例会涉及到如何向 Spring 动态注入已经实例化后的 Bean 对象。为什么会出现这个场景呢?
首先 Redis 的发布订阅,简单案例代码如下;
// 创建Redisson客户端
RedissonClient redisson = Redisson.create();
// 获取RTopic对象
RTopic<String> topic = redisson.getTopic("myTopic");
// 发布消息
topic.publish("Hello, Redisson!");
// 添加监听器
topic.addListener(String.class, (channel, msg) -> {
System.out.println("Received message: " + msg);
});
// 关闭Redisson客户端
redisson.shutdown();
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
public @interface RedisTopic {
String topic() default "";
}
快捷键;你可以在 IDEA 工程中,摁2下 Shift 搜索这个类。
源码:cn.bugstack.xfg.dev.tech.infrastructure.trigger.mq#RedisTopicListener02
@Slf4j
@Service
@RedisTopic(topic = "testRedisTopic02")
public class RedisTopicListener02 implements MessageListener<String> {
@Override
public void onMessage(CharSequence channel, String msg) {
log.info("02-监听消息(Redis 发布/订阅): {}", msg);
}
}
源码:cn.bugstack.xfg.dev.tech.config.RedisClientConfig#redissonClient
// 添加监听
String[] beanNamesForType = applicationContext.getBeanNamesForType(MessageListener.class);
for (String beanName : beanNamesForType) {
MessageListener bean = applicationContext.getBean(beanName, MessageListener.class);
Class<?> beanClass = bean.getClass();
if (beanClass.isAnnotationPresent(RedisTopic.class)) {
RedisTopic redisTopic = beanClass.getAnnotation(RedisTopic.class);
RTopic topic = redissonClient.getTopic(redisTopic.topic());
topic.addListener(String.class, bean);
// 动态创建 bean 对象,注入到 spring 容器,bean 的名称为 redisTopic,对象为 RTopic
ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
beanFactory.registerSingleton(redisTopic.topic(), topic);
}
}
@Slf4j
@Repository
public class OrderRepository implements IOrderRepository {
@Resource
private IRedisService redissonService;
@Resource
private IUserOrderDao userOrderDao;
@Resource
private RTopic testRedisTopic;
@Resource(name = "testRedisTopic02")
private RTopic testRedisTopic02;
@Resource(name = "testRedisTopic03")
private RTopic testRedisTopic03;
@Override
public String createOrder(OrderAggregate orderAggregate) {
// 省略...
testRedisTopic02.publish(JSON.toJSONString(orderEntity));
testRedisTopic03.publish(JSON.toJSONString(orderEntity));
return orderId;
}
}
源码:cn.bugstack.xfg.dev.tech.test.domain.OrderServiceTest
@Test
public void test_createOrder() throws InterruptedException {
String sku = RandomStringUtils.randomNumeric(9);
int count = 10000;
orderService.initSkuCount(sku, count);
for (int i = 0; i < count; i++) {
threadPoolExecutor.execute(() -> {
UserEntity userEntity = UserEntity.builder()
.userId("小傅哥")
.userName("xfg".concat(RandomStringUtils.randomNumeric(3)))
.userMobile("+86 13521408***")
.build();
SKUEntity skuEntity = SKUEntity.builder()
.sku(sku)
.skuName("《手写MyBatis:渐进式源码实践》")
.quantity(1)
.unitPrice(BigDecimal.valueOf(128))
.discountAmount(BigDecimal.valueOf(50))
.tax(BigDecimal.ZERO)
.totalAmount(BigDecimal.valueOf(78))
.build();
DeviceVO deviceVO = DeviceVO.builder()
.ipv4("127.0.0.1")
.ipv6("2001:0db8:85a3:0000:0000:8a2e:0370:7334".getBytes())
.machine("IPhone 14 Pro")
.location("shanghai")
.build();
long threadBeginTime = System.currentTimeMillis(); // 记录线程开始时间
// 耗时:4毫秒
String orderId = orderService.createOrder(new OrderAggregate(userEntity, skuEntity, deviceVO));
// 耗时:106毫秒
String orderId = orderService.createOrderByLock(new OrderAggregate(userEntity, skuEntity, deviceVO));
// 耗时:4毫秒
String orderId = orderService.createOrderByNoLock(new OrderAggregate(userEntity, skuEntity, deviceVO));
long took = System.currentTimeMillis() - threadBeginTime;
totalExecutionTime.addAndGet(took); // 累加线程耗时
log.info("写入完成 {} 耗时 {} (ms)", orderId, took / 1000);
});
}
new Thread(() -> {
while (true) {
if (threadPoolExecutor.getActiveCount() == 0) {
log.info("执行完毕,总耗时:{} (ms)", (totalExecutionTime.get() / 1000));
log.info("执行完毕,总耗时:{}", "\r\033[31m" + (totalExecutionTime.get() / 1000) + "\033[0m");
break;
}
try {
Thread.sleep(350);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
// 等待
new CountDownLatch(1).await();
}
除了以上这结合业务的功能测试以外,本章还提供了;读写锁、异步锁、信号量、队列、延迟队列的相关测试。
/**
* 延迟队列场景应用;https://mp.weixin.qq.com/s/jJ0vxdeKXHiYZLrwDEBOcQ
*/
@Test
public void test_getDelayedQueue() throws InterruptedException {
RBlockingQueue<Object> blockingQueue = redissonService.getBlockingQueue("xfg-dev-tech-task");
RDelayedQueue<Object> delayedQueue = redissonService.getDelayedQueue(blockingQueue);
new Thread(() -> {
try {
while (true){
Object take = blockingQueue.take();
log.info("测试结果 {}", take);
Thread.sleep(10);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
int i = 0;
while (true){
delayedQueue.offerAsync("测试" + ++i, 100L, TimeUnit.MILLISECONDS);
Thread.sleep(1000L);
}
}
cn.bugstack.xfg.dev.tech.test.infrastructure.redis.RedisTest
Redis 的使用还有很多有意思、有价值的场景,如果读者还有好的案例,也可以在源码中提交PR。
- END -