通过redis查询的数据应该是查询频率可能较高的、允许数据不够准确的(即使数据有一些不准确,但是对整个项目没有严重后果的),甚至这些数据极少改变的。
问题出现:由于需要多台服务器进行并行分流,而服务器之间的session不能共享
解决方案:用redis代替session,因为redis与session同为键值结构,redis内存存储速度快,不同服务器都可从redis集群获取数据
在问题3中,先删除缓存,再操作数据库比先操作数据库再删除缓存出现读脏数据的概率更大入下
先更新数据库再更新缓存:
优点:
保证了数据的一致性,因为数据库始终是数据的最终来源。
缺点:
可能会导致数据不一致
先更新缓存再更新数据库:
优点:
这种策略的优点在于可以提高系统的响应速度,因为用户可以直接从缓存中读取到最新的数据。此外,由于数据库更新是异步进行的,因此不会阻塞主线程,从而提高了系统的并发处理能力。
缺点:
首先,如果数据库更新失败,那么缓存中的数据将与数据库中的数据不一致。其次,由于缓存中的数据是实时更新的,而数据库中的数据可能会稍后更新,因此在数据库和缓存之间会存在一个时间窗口,在这个时间窗口内,数据库和缓存中的数据可能不一致。
暂时无法在飞书文档外展示此内容
如何避免读脏数据?确保查询数据库操作在更新数据库操作之后
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。
缓存空对象(首选)
可以将空对象存到缓存中,并设置有效期
布隆过滤
存在不一定存在,不存在一定不存在
其他方案
缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
给不同的Key的TTL添加随机值
基础时间上加上5以内的随机数
利用Redis集群提高服务的可用性
主从redis服务器避免宕机
给缓存业务添加降级限流策略
直接拒绝服务,保护数据库健康
给业务添加多级缓存
通过其他缓存来弥补
缓存击穿问题也叫热点Key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访回会在瞬间给数据库带来巨大的冲击。
一致性高
不设置TTL,加上一个过期时间字段,但是返回的旧数据,不保证数据一致性,可用性高
流程图:
方案要点:
新建类来保存新字段,可以通过继承原有类,也可通过下面这种方式通过data来指示类,可扩展性高
@Data
public class RedisData {
private LocalDateTime expireTime;
private Object data;
}
/**
* 功能:
* 作者:Mouse_King
* 日期: 2024/2/28 15:03
*/
@Component
@Slf4j
public class CacheClient {
private StringRedisTemplate stringRedisTemplate;
public CacheClient(StringRedisTemplate stringRedisTemplate){
this.stringRedisTemplate = stringRedisTemplate;
}
public void set (String key, Object value, Long time, TimeUnit unit){
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);
}
public void setWithLogicalExpire (String key, Object value, Long time, TimeUnit unit){
RedisData redisData = new RedisData();
redisData.setData(value);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}
public <R,ID> R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback,Long time, TimeUnit unit){
String key = keyPrefix + id;
String json = stringRedisTemplate.opsForValue().get(key);
if(StrUtil.isNotBlank(json) ){
return JSONUtil.toBean(json,type);
}
if(json != null){
return null;
}
R r = dbFallback.apply(id);
if(r == null){
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}
this.set(key,r,time,unit);
return r;
}
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public <R,ID> R queryWithLogicExpire(String keyPrefix ,ID id,Class<R> type,Function<ID,R> dbFallback,Long time, TimeUnit unit){
String key = keyPrefix + id;
String json = stringRedisTemplate.opsForValue().get(key);
if(StrUtil.isBlank(json) ){
return null;
}
//命中判断过期时间,过期重建缓存
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
LocalDateTime expireTime = redisData.getExpireTime();
if(expireTime.isAfter(LocalDateTime.now())){
//未过期,直接返回
return r;
}
//过期重建缓存
String lockKey = LOCK_SHOP_KEY+id;
boolean isLock = tryLock(lockKey);
if (isLock){
//拿到锁开启独立线程
CACHE_REBUILD_EXECUTOR.submit(()->{
try {
R r1 = dbFallback.apply(id);
this.setWithLogicalExpire(key,r1,time,unit);
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
unLock(lockKey);
}
});
}
return r;
}
private boolean tryLock(String key){
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
//避免空指针,作为基本类型返回
return BooleanUtil.isTrue(aBoolean);
}
private void unLock(String key){
stringRedisTemplate.delete(key);
}
}
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Override
public Result seckillVoucher(Long voucherId) {
//查询
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//判断时间
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("秒杀尚未开始");
}
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已经结束");
}
//判断库存是否充足
if(voucher.getStock() < 1){
return Result.fail("库存不足");
}
Long userId = UserHolder.getUser().getId();
//synchronized锁的是对象,toString之后,底层是创建一个新的对象,而intern是根据该值到string常量池中去找,找到了值一样的返回该对象,没有则创建,这里需要锁的是值一样的,而不是
synchronized(userId.toString().intern()) {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
@Transactional
public Result createVoucherOrder(Long voucherId){
Long userId = UserHolder.getUser().getId();
//一人一单,联合查询
Integer count = query().eq("user_id", userId)
.eq("voucher_id", voucherId)
.count();
if(count > 0){
return Result.fail("该用户已经购买过");
}
//减库存,生成订单
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.gt("stock",0)
.update();
if(!success){
return Result.fail("库存不足");
}
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
return Result.ok(orderId);
}
}
分布式锁的核心特性:对线程可见,互斥
通过setnx机制实现
极端情况下会出现以下问题
解决方案
再次出现问题
导致原因:多线程情况下,即使做了Redis的分布式锁,但是由于锁标识的判断和锁的释放不是原子性的,当出现线程一获取锁之后,成功判断锁标识后,因为特殊原因阻塞之后,锁超时释放后,线程二获取锁后,还在执行业务时,线程一恢复正常,因为之前已经判断成功,所以释放了线程二的锁,导致业务出现问题,总之就是解决事务阻塞和锁判断阻塞带来的问题,解决方案分别是设置锁ID和将锁判断和释放变为原子性
最终解决方案
通过lua脚本或者redis的事务锁(不推荐)
/**
* 功能:
* 作者:Mouse_King
* 日期: 2024/3/2 15:22
*/
public class SimpleRedisLock implements ILock{
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public boolean tryLock(long timeoutSec) {
//获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
stringRedisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX+name),
ID_PREFIX+Thread.currentThread().getId());
}
}
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-MemoryDataGrid)。它不仅提供了一系列的分布式的java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现,底层是lua脚本
官网地址:https://redisson.org
GitHub地址:https://github.com/redisson/redisson
依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
配置
@Configuration
public class ReidssonConfig {
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.146.128:6379").setPassword("123321");
return Redisson.create(config);
}
}
使用
@Resource
private RedissonClient redissonClient;
@Test
void testRedisson
throws InterruptedException {
//获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("LockName");
//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock=lock.tryLock(1,10, TimeUnit.SECONDS);
//判断释放获取成功
if(isLock){
try{
System.out.println("执行业务");
}finally{
//释放锁
lock.unlock();
}
}
}
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
private IVoucherOrderService proxy;
private BlockingQueue<VoucherOrder> ordersTasks = new ArrayBlockingQueue<>(1024*1024);
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable{
@Override
public void run() {
while (true){
//获取订单中的信息
try {
VoucherOrder voucherOrder = ordersTasks.take();
handleVoucherOrder(voucherOrder);
} catch (InterruptedException e) {
log.error("处理订单异常",e);
}
//创建订单
}
}
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = lock.tryLock();
if(!isLock){
//获取锁失败
log.error("不允许重复下单");
return ;
}
try {
proxy.createVoucherOrder(voucherOrder);
} finally {
lock.unlock();
}
}
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
//执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString()
);
int r = result.intValue();
if(r != 0){
return Result.fail(r == 1?"库存不足":"不能重复下单");
}
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
ordersTasks.add(voucherOrder);
//获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(0);
}
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder){
Long userId = voucherOrder.getUserId();
//一人一单,联合查询
Integer count = query().eq("user_id", userId)
.eq("voucher_id", voucherOrder.getVoucherId())
.count();
if(count > 0){
log.error("该用户已经购买过");
return;
}
//减库存,生成订单
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock",0)
.update();
if(!success){
log.error("该用户已经购买过");
return;
}
save(voucherOrder);
}
}
以上这方法的消息队列还是依赖jvm,当jvm宕机后,消息队列也会清空,因此需要保证消息队列独立于jvm,可使用redis来实现消息队列(实际生产中使用rabbitMQ),其中Redis有三种方式实现消息队列,可以基于List结构模拟消息队列,PubSub基本的点对点消息队列,Stream实现
stream中的消息可以重复读取,永久存在,
创建消息队列:
redis
.
call
('xadd','stream.orders','*',"userId",userId,'voucherId',voucherId,'id',orderId)
如果像这种三个参数的
读取消息:
BLOCK给0表示永久阻塞
为了解决上述出现的消息漏读情况,可以采用消费者组的方法来解决,通过ACK和自身可以从未消费的消息开始读取的特性,没有ACK的数据都会放入panding-list
VoucherOrderServiceImpl
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
private IVoucherOrderService proxy;
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable{
String queueName ="stream.orders";
@Override
public void run() {
while (true){
try {
//获取消息 队列中的订单中信息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
//判断是否成功
if(list == null || list.isEmpty()){
continue;
}
//解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//下单
handleVoucherOrder(voucherOrder);
//确认ACK
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.error("处理订单异常",e);
handlePendingList();
}
//创建订单
}
}
private void handlePendingList() {
while (true){
try {
//获取消息 队列中的订单中信息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
//判断是否成功
if(list == null || list.isEmpty()){
//获取失败表示pending-list没有异常消息
break;
}
//解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//下单
handleVoucherOrder(voucherOrder);
//确认ACK
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
} catch (Exception e) {
log.error("处理订单异常",e);
try {
Thread.sleep(50);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
//创建订单
}
}
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = lock.tryLock();
if(!isLock){
//获取锁失败
log.error("不允许重复下单");
return ;
}
try {
proxy.createVoucherOrder(voucherOrder);
} finally {
lock.unlock();
}
}
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
//执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),
userId.toString(),
String.valueOf(orderId)
);
int r = result.intValue();
if(r != 0){
return Result.fail(r == 1?"库存不足":"不能重复下单");
}
//获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(0);
}
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder){
Long userId = voucherOrder.getUserId();
//一人一单,联合查询
Integer count = query().eq("user_id", userId)
.eq("voucher_id", voucherOrder.getVoucherId())
.count();
if(count > 0){
log.error("该用户已经购买过");
return;
}
//减库存,生成订单
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock",0)
.update();
if(!success){
log.error("该用户已经购买过");
return;
}
save(voucherOrder);
}
}
ReidsConfig
@Configuration
public class ReidsConfig { @Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.146.128:6379").setPassword("123456");
return Redisson.create(config);
}
}
RedisIdWorker
@Component
public class RedisIdWorker {
//开始时间
private static final long BEGIN_TIMESTAMP = 1704067200L;
//序列化的位数
private static final int COUNT_BITS = 32;
@Resource
private StringRedisTemplate stringRedisTemplate;
public long nextId(String keyPrefix){
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
return timestamp << COUNT_BITS | count;
}
}
/*用户签到*/
@Override
public Result sign() {
Long userID = UserHolder.getUser().getId();
LocalDateTime now = LocalDateTime.now();
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
String key = USER_SIGN_KEY + userID + keySuffix;
int dayOfMonth = now.getDayOfMonth();
//bitMap数据下标是从0开始的,true就是1,因为Boolean类型比int类型占用的空间更小
stringRedisTemplate.opsForValue().setBit(key,dayOfMonth - 1, true);
return Result.ok();
}
/*签到统计*/
@Override
public Result signCount() {
//获取当前用户信息
Long userID = UserHolder.getUser().getId();
//获取当前时间
LocalDateTime now = LocalDateTime.now();
//根据当前时间设置redis中的键后缀格式
String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));
//设置redis中的键
String key = USER_SIGN_KEY + userID + keySuffix;
//获取当前日期是该月中的第几天
int dayOfMonth = now.getDayOfMonth();
//从0开始往后读取当前日期的位数,得到十进制返回的结果,因为bitfield命令可以同时操作,可返回多个结果
List<Long> result = stringRedisTemplate.opsForValue().bitField(
key,
BitFieldSubCommands.create()
.get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
);
if(result == null || result.isEmpty()){
return Result.ok(0);
}
//这里只有一次操作,直接取第一个元素就能得到数据
Long num = result.get(0);
//用户一次都没签到
if(num == null || num == 0){
return Result.ok(0);
}
int count = 0;
//一直遍十进制形式的num,与1做且运算,为1表示用户签到,签到天数加一,运算完后向右移一位,如果为0则终止,统计用户连续签到天数
while (true){
if((num & 1) == 0){
break;
} else {
count ++;
}
num >>>= 1;
}
return Result.ok(count);
}
@Override
public Result queryShopByTypeId(Integer typeId, Integer current, Double x, Double y) {
if(x == null || y == null){
// 根据类型分页查询
Page<Shop> page = query()
.eq("type_id", typeId)
.page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));
// 返回数据
return Result.ok(page.getRecords());
}
//得出数据的开端和末端,方便redis模拟分页
int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;
int end = current * SystemConstants.DEFAULT_PAGE_SIZE;
//设置按类型分页查询得到的店铺的键
String key = SHOP_GEO_KEY + typeId;
//查询距离当前位置,5000米以内的点位,并计算出当前点位与指定范围内的点位的距离,limit默认从第一条数据开始,只能指定在哪结束
GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo().search(key, GeoReference.fromCoordinate(x, y),
new Distance(5000),
//GEO中只能指定结束位置,起始需要到list中设置
RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end));
//如果没有店铺在当前范围内
if(results == null ){
return Result.ok(Collections.emptyList());
}
//获取真正的点位集合
List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent();
//如果是最后一页,直接返回空列表
if(list.size() <= from){
return Result.ok(Collections.emptyList());
}
//根据集合位置创建id数组,因为redis中存的是id,详细信息是根据id到数据库中去查询得到的
List<Long> ids = new ArrayList<>(list.size());
//创建map数组,存的是范围内的每个点位,与当前点位之间的距离
Map<String,Distance> distanceMap = new HashMap<>(list.size());
//将真正点位的集合手动跳到开端,模拟分页,将列表中的点位放入ids数据,和map数组
list.stream().skip(from).forEach(result ->{
String shopIdStr = result.getContent().getName();
ids.add(Long.valueOf(shopIdStr));
Distance distance = result.getDistance();
distanceMap.put(shopIdStr,distance);
});
//将数组转化为字符串,方便后边的数据库查询
String idStr = StrUtil.join(",",ids);
//这么写的目的是为了方便按照查询到的距离排序,因为直接写类似于where id in ()这种格式的话,查询到的数据不会按照距离排序
List<Shop> shops = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();
//为从数据库查询到的店铺数据,设置距离
for (Shop shop : shops) {
shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());
}
return Result.ok(shops);
}
order by id+字符串
指定了店铺的排序顺序,实际上geo已经返回了距离排序的顺序,因此直接跟着geo返回的id顺序排序即可不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
优点:信息全面,不会有缺失。并且实现也相对简单
缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
三种实现方式
优点:收件箱用完即清,节省空间
缺点:如果有多条信息堆积的话,读取延迟高
优点:延时低
缺点:如果发送人粉丝较多,内存占用高
需求:
max: 当前时间戳 | 上一次查询的最小时间戳
min:0
offset: 0 | 在上一次的结果中,与最小值一样的元素的个数
count:3//每页的条数
minTime:本次查询的最小时间戳
offset:zset中与上一次最小值一样的元素的个数
@Override
public Result queryBlogOfFollow(Long max, Integer offset) {
Long userId = UserHolder.getUser().getId();
String key = FEED_KEY + userId;
//范围查找,这里的用zse集合作为收件箱,feed流的推模式,降序排序
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
.reverseRangeByScoreWithScores(key, 0, max, offset, 2);
if(typedTuples == null || typedTuples.isEmpty()){
return Result.ok();
}
long minTime = 0;
int os = 1;
List<Long> ids = new ArrayList<>(typedTuples.size());
//解析数据:blogId,minTime,offset
for (ZSetOperations.TypedTuple<String> typedTuple : typedTuples) {
ids.add(Long.valueOf(typedTuple.getValue()));
long time = typedTuple.getScore().longValue();
if(time == minTime){
os++;
} else {
minTime = time;
os = 1;
}
}
//按照指定id排序,由于在zset里面id是根据时间戳来排序的,已经是排好序的了
String idStr = StrUtil.join(",", ids);
List<Blog> blogs = query().in("id",ids)
.last("order by field(id,"+idStr+")").list();
for (Blog blog : blogs) {
queryBlogUser(blog);
isBlogLiked(blog);
}
ScrollResult r = new ScrollResult();
r.setList(blogs);
r.setOffset(os);
r.setMinTime(minTime);
return Result.ok(r);
}
上述代码在对score值为[5,3,3,3,2,2,1,1]进行分页查询时还是会出现统一数据重复查询的情况,
因此需要做改进,在每次从zset中取出数据后,统计最小的score值,如果最小的score值和传进来的最大score相等,则在传进来的offset上加上这次的数量,作为下一次的offset,如果不相等的话就按上述逻辑,这种情况出现在重复score大于每页的数据数量
粉丝数量多的一般粉丝采用拉模式,活跃用户采用推模式(也就是推拉结合模式,之前有做过详解, 可以去我的如何设计一个博客系统这篇文章去看看),粉丝数量少的直接采用推模式
利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户
优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
缺点:如果算法不精准,可能起到反作用()()
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。