🌷🍁 博主猫头虎 带您 Go to New World.✨🍁 🦄 博客首页——猫头虎的博客🎐 🐳《面试题大全专栏》 文章图文并茂🦕生动形象🦖简单易学!欢迎大家来踩踩~🌺 🌊 《IDEA开发秘籍专栏》学会IDEA常用操作,工作效率翻倍~💐 🌊 《100天精通Golang(基础入门篇)》学会Golang语言,畅玩云原生,走遍大小厂~💐
🪁🍁 希望本文能够给您带来一定的帮助🌸文章粗浅,敬请批评指正!🍁🐥
本文探讨了如何实现一个高并发、幂等的计数器服务,该服务用于处理外部的 inc
请求以增加特定视频的播放计数。考虑到网络延迟和重试等因素,该服务需要确保每个请求至少被处理一次,同时避免重复计数。我们使用了 MySQL 用于持久化存储计数数据,并用 Redis 进行幂等性检查。本文通过 Go、Java 和 Python 三种编程语言展示了具体的实现代码,并对核心逻辑进行了详细解释。Java 代码部分更是进行了全流程的展示,包括幂等性检查、数据库更新和已处理请求的记录。这样的设计不仅确保了高并发处理能力,还实现了请求的幂等性。
在分布式系统中,高并发和幂等性是两个非常关键的问题。本文将探讨如何实现一个高并发、幂等的计数器服务。该服务接受外部的 inc
请求,用于增加特定视频的播放计数。由于网络延迟和请求重试等原因,多个相同或不同的 inc
请求可能并发到达服务。因此,服务需要确保每个请求至少被处理一次(at least once),同时避免重复计数。我们将使用 Go、Java 和 Python 来分别演示这一实现。
高并发幂等计数器题目 问题描述: 1.实现一个计数器服务 2.服务接收外部的 inc 请求,每个请求具有全局唯一 request id 和视频 id 3.因为网络和重试的原因,请求可能会重复的到达 4.时序上,多个重复的请求可能并发达到,两次重复请求之间的间隔不可预期 5.需要保证 at least once ,计数值不能丢失 6.可以依赖一些外部组件, mysql redis
request_id
和 video_id
。request_id
,如果已存在,则该请求已被处理。request_id
,则将其存入 Redis,并进行数据库更新操作。// 导入相应的包
import (
"github.com/go-redis/redis/v8"
"database/sql"
// 其他必要的包
)
func incHandler(requestID string, videoID string) string {
if isProcessed(requestID) {
return "Already Processed"
}
// 更新数据库
updateCounter(videoID)
return "OK"
}
func isProcessed(requestID string) bool {
// Redis 检查
val, _ := redisClient.Get(ctx, requestID).Result()
return val != ""
}
func updateCounter(videoID string) {
// MySQL 更新
// 省略具体实现
}
import redis.clients.jedis.Jedis;
import org.springframework.jdbc.core.JdbcTemplate;
// 其他必要的导入
@RestController
public class CounterController {
@Autowired
Jedis jedis;
@Autowired
JdbcTemplate jdbcTemplate;
@RequestMapping("/inc")
public String inc(@RequestParam String requestId, @RequestParam String videoId) {
if (isProcessed(requestId)) {
return "Already Processed";
}
// 更新数据库
updateCounter(videoId);
return "OK";
}
public boolean isProcessed(String requestId) {
// Redis 检查
return jedis.exists(requestId);
}
public void updateCounter(String videoId) {
// MySQL 更新
// 省略具体实现
}
}
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.bind.annotation.*;
import redis.clients.jedis.Jedis;
@RestController
public class CounterController {
@Autowired
private Jedis jedis;
@Autowired
private JdbcTemplate jdbcTemplate;
@GetMapping("/inc")
public String inc(@RequestParam String requestId, @RequestParam String videoId) {
// Step 1: 幂等性检查
if (isProcessed(requestId)) {
return "Already Processed";
}
// Step 2: 更新数据库
if (updateCounter(videoId)) {
// Step 3: 将 requestId 存入 Redis 以保证幂等性
jedis.set(requestId, "true");
return "OK";
}
return "Failed";
}
private boolean isProcessed(String requestId) {
// 使用 Redis 检查 requestId 是否已处理
return jedis.exists(requestId);
}
private boolean updateCounter(String videoId) {
// 使用 JdbcTemplate 和 MySQL 更新计数器
String query = "SELECT count FROM video_counter WHERE video_id = ?";
Integer count = jdbcTemplate.queryForObject(query, new Object[]{videoId}, Integer.class);
if (count == null) {
// 如果视频尚未有计数,初始化为 1
jdbcTemplate.update("INSERT INTO video_counter(video_id, count) VALUES(?, 1)", videoId);
} else {
// 如果视频已有计数,加 1
jdbcTemplate.update("UPDATE video_counter SET count = ? WHERE video_id = ?", count + 1, videoId);
}
return true;
}
}
幂等性检查: 使用 Redis 进行了幂等性检查。如果请求已经被处理过(即在 Redis 中有记录),我们就直接返回。
数据库更新: 我们使用 Spring 的 JdbcTemplate 来与 MySQL 进行交互。如果这是一个全新的 video_id,我们将其添加到数据库并初始化计数为 1;否则,我们找到当前计数并加 1。
记录处理过的请求: 最后,我们将处理过的 request_id 添加到 Redis 中,以便进行未来的幂等性检查。
这样,我们就得到了一个高并发、幂等的计数器服务。
如果两个重复的请求,进入到Step1 都没有查到,不是就都进入step2
如果两个相同的 request_id
几乎同时到达,并且都通过了 Step 1 的幂等性检查,那么它们都会进入 Step 2,从而违反了我们想要的幂等性。
在不使用 Redis 或其他外部锁服务的情况下,这个问题变得更加复杂。一种可能的解决方案是使用数据库的原子操作和唯一约束来确保幂等性。具体来说,您可以使用数据库事务来解决这个问题。
processed_requests
)用于存储已经处理过的 request_id
。该表有一个唯一约束在 request_id
上。
request_id
插入到 processed_requests
表中。如果插入成功,说明这是第一次处理该请求,然后再去更新视频的播放计数。如果插入失败(因为唯一约束),则说明该请求已经被处理过,直接返回。
这里是更新后的 Java 代码片段:
import org.springframework.transaction.annotation.Transactional;
// ...
@RestController
public class CounterController {
@Autowired
private JdbcTemplate jdbcTemplate;
@Transactional
@GetMapping("/inc")
public String inc(@RequestParam String requestId, @RequestParam String videoId) {
// Step 1: 幂等性检查和插入
try {
jdbcTemplate.update("INSERT INTO processed_requests(request_id) VALUES(?)", requestId);
} catch (DuplicateKeyException e) {
return "Already Processed";
}
// Step 2: 更新数据库
updateCounter(videoId);
return "OK";
}
private void updateCounter(String videoId) {
String query = "SELECT count FROM video_counter WHERE video_id = ?";
Integer count = jdbcTemplate.queryForObject(query, new Object[]{videoId}, Integer.class);
if (count == null) {
jdbcTemplate.update("INSERT INTO video_counter(video_id, count) VALUES(?, 1)", videoId);
} else {
jdbcTemplate.update("UPDATE video_counter SET count = ? WHERE video_id = ?", count + 1, videoId);
}
}
}
这样,在并发环境中,由于数据库的唯一约束和事务,我们可以确保每个 request_id
只会被处理一次。这解决了您提到的问题。
如果使用 Redis,一种常见的方法是利用 Redis 的原子操作来解决这个问题。具体来说,可以使用 SETNX
(SET if Not eXists)命令来设置 request_id
的值,并检查操作是否成功。这个命令只有在键不存在时才会设置值,并返回一个标识(通常是 1 表示成功,0 表示失败)。因为 Redis 的这个操作是原子性的,所以即使有多个相同的请求几乎同时到达,也只有一个能够成功设置该 request_id
。
下面是 Java 代码中这部分逻辑的示例:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import redis.clients.jedis.Jedis;
@RestController
public class CounterController {
@Autowired
private Jedis jedis;
@Autowired
private JdbcTemplate jdbcTemplate;
@GetMapping("/inc")
public String inc(@RequestParam String requestId, @RequestParam String videoId) {
// Step 1: 幂等性检查
if (!setIfNotExists(requestId)) {
return "Already Processed";
}
// Step 2: 更新数据库
updateCounter(videoId);
return "OK";
}
private boolean setIfNotExists(String requestId) {
return jedis.setnx(requestId, "true") == 1;
}
private void updateCounter(String videoId) {
String query = "SELECT count FROM video_counter WHERE video_id = ?";
Integer count = jdbcTemplate.queryForObject(query, new Object[]{videoId}, Integer.class);
if (count == null) {
jdbcTemplate.update("INSERT INTO video_counter(video_id, count) VALUES(?, 1)", videoId);
} else {
jdbcTemplate.update("UPDATE video_counter SET count = ? WHERE video_id = ?", count + 1, videoId);
}
}
}
这里,setIfNotExists
方法使用 Redis 的 SETNX
命令来尝试设置一个 request_id
。如果设置成功(返回 1),则继续更新数据库。如果设置失败(返回 0,表示该 request_id
已经存在),则直接返回 “Already Processed”,实现幂等性。
这样,在高并发环境下,即使多个相同的 request_id
几乎同时到达,也只有一个会被实际处理,其他的都会在 setIfNotExists
阶段被拦截,从而确保了幂等性。
from flask import Flask, request
import redis
import mysql.connector
app = Flask(__name__)
r = redis.Redis(host='localhost', port=6379, db=0)
conn = mysql.connector.connect(user='root', password='password', database='test')
@app.route('/inc', methods=['POST'])
def inc():
request_id = request.form['request_id']
video_id = request.form['video_id']
if r.exists(request_id):
return 'Already Processed'
r.set(request_id, 1)
cursor = conn.cursor()
# 更新 MySQL
# 省略具体实现
return 'OK'
面试官提问:1.我就是同一个时间段大量的请求到来了怎么办? 2.还有这个合适的过期时间怎么合适
当大量请求在同一时间段到来时,一般有以下几种应对方式:
设置合适的过期时间主要依赖于业务需求和系统资源。以下几点可能有助于确定一个合适的过期时间:
综合以上因素,过期时间的设置通常需要在性能和准确性之间找到一个平衡点。这通常需要通过实验和性能测试来不断调整和优化。
如果不使用 Redis,我们有几种其他方法也可以实现高并发和幂等性。
可以在数据库中为 request_id
设置唯一索引。当重复请求到来时,尝试插入相同的 request_id
将会触发数据库错误,从而达到幂等性的目的。
private boolean isProcessed(String requestId) {
try {
jdbcTemplate.update("INSERT INTO request_ids(request_id) VALUES(?)", requestId);
} catch (DuplicateKeyException e) {
return true;
}
return false;
}
在应用级别维护一个已处理 request_id
的集合。这样不需要外部存储,但这种方式不适合分布式系统,因为每个实例都有自己的内存。
private Set<String> processedRequestIds = Collections.synchronizedSet(new HashSet<>());
private boolean isProcessed(String requestId) {
if (processedRequestIds.contains(requestId)) {
return true;
}
processedRequestIds.add(requestId);
return false;
}
使用数据库或者其他分布式锁机制确保同一时间只有一个请求在处理。这种方法可以确保幂等性但可能影响系统性能。
private boolean isProcessed(String requestId) {
if (acquireLock(requestId)) {
// 进行处理
releaseLock(requestId);
return false;
}
return true;
}
使用消息队列(如 Kafka、RabbitMQ 等)确保消息的幂等处理。大多数现代消息队列都提供了这种机制。
在文件系统中为每个 request_id
创建一个唯一文件。通过检查文件存在与否,来确定是否处理了该请求。但这种方法不适合高并发场景。
每种方法都有其优点和局限性。选择哪种方法取决于具体需求、可用资源以及你愿意接受的复杂性。
通过使用 Redis 进行幂等性检查,以及使用 MySQL 进行持久化存储,我们成功地实现了一个高并发、幂等的计数器服务。这种设计能够在高并发条件下保证 at least once 语义,同时也实现了幂等性。
该设计还有进一步优化和扩展的空间,例如,可以加入更多的负载均衡和高可用性特性,或者使用消息队列来进一步解耦生产者和消费者。
感谢您的阅读,希望本文能为您提供有用的信息和启示。如有任何问题或建议,请随时留言。
======= ·