在我的并发项目中,我需要在redis中获取一个值,然后更新它并在redis中设置。像下面的代码一样,我期望的结果应该是3000,但是我得不到正确的结果。序列可能是错误的,可能是GET SET SET或者GET SET SET GET,等等。我怎样才能得到正确的序列和正确的结果?我应该使用一些锁吗?
import * as redis from 'redis';
let config: redis.ClientOpts = {
host: '127.0.0.1',
port: 6379
};
let redisClient: redis.RedisClient = new redis.RedisClient(config);
redisClient.set('num', '0');
(async () => {
for (let i = 0; i < 1000; i++) {
await add ();
}
})();
(async () => {
for (let i = 0; i < 1000; i++) {
await add ();
}
})();
(async () => {
for (let i = 0; i < 1000; i++) {
await add ();
}
})();
// I know incr command, this is just an example.
async function add () {
let numStr: string = await get('num');
let num: number = Number(numStr);
num++;
await set('num', String(num));
console.log(num);
}
async function get (key: string): Promise<string> {
return new Promise<string>((resovle, reject) => {
redisClient.get(key, (err: Error, reply: string) => {
if (err) {
console.error(err);
reject(err);
}
resovle(reply);
})
});
}
async function set (key: string, value: string): Promise<string> {
return new Promise<string>((resovle, reject) => {
redisClient.set(key, value, (err: Error, reply: string) => {
if (err) {
console.error(err);
reject(err);
}
resovle(reply);
})
});
}
发布于 2018-06-01 01:29:27
Redis提供了一种更灵活的方式来使用多执行方法“原子地”执行事务。
假设您正在处理一个银行系统,并且一个客户请求monet转账。您需要先将客户账户中的钱贴现,然后再存入另一个账户。在这些操作之间,其中一个操作可能会失败。因此,我们使用事务。
事务是原子的,这意味着要么所有的操作都发生,要么它们都不发生。数据库具有锁定和并发管理实现,可确保这一点发生,同时不会对系统性能造成严重影响。
示例:
var redis = require("redis"),
client = redis.createClient(), multi;
async function add () {
multi = client.multi();
let numStr: string = await multi.get('num');
let num: number = Number(numStr);
num++;
await multi.set('num', String(num));
multi.exec(function (err, replies) {
console.log(replies);});
}
}
发布于 2018-06-01 17:03:39
我已经解决了,谢谢大家。这是我的代码。参考:https://redis.io/topics/distlock
import * as redis from 'redis';
import * as crypto from 'crypto';
const lockScript: string = 'return redis.call("set", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])';
const unlockScript: string = 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end';
let config: redis.ClientOpts = {
host: '127.0.0.1',
port: 6379
};
let redisClient: redis.RedisClient = new redis.RedisClient(config);
redisClient.set('num', '0');
(async () => {
for (let i = 0; i < 1000; i++) {
await add ();
}
})();
(async () => {
for (let i = 0; i < 1000; i++) {
await add ();
}
})();
(async () => {
for (let i = 0; i < 1000; i++) {
add ();
}
})();
async function add () {
let resourse: string = 'lock:num';
let uniqueStr: string = crypto.randomBytes(10).toString('hex');
await attemptLock(resourse, uniqueStr);
let num: number = Number(await get('num'));
num++;
console.log(num);
await set('num', String(num));
await unlock(resourse, uniqueStr);
}
async function loop (resourse: string, uniqueStr: string, ttl?: string) {
return new Promise<any>((resolve, reject) => {
setTimeout(async () => {
let result: any = await lock(resourse, uniqueStr, ttl);
resolve(!!result);
}, 10);
});
}
async function attemptLock (resourse: string, uniqueStr: string, ttl?: string) {
let result = await loop(resourse, uniqueStr, ttl);
if (result) {
return true;
} else {
return attemptLock(resourse, uniqueStr, ttl);
}
}
async function lock (resourse: string, uniqueStr: string, ttl?: string) {
ttl = ttl ? ttl : '30000';
return new Promise<any>((resolve, reject) => {
redisClient.eval(lockScript, 1, resourse, uniqueStr, ttl, (err: Error, reply: any) =>{
if (err) {
console.error(err);
reject(err);
}
// reply will be nil when the key exists, on the contrary it will be "OK"
resolve(reply);
});
});
}
async function unlock (resourse: string, uniqueStr: string) {
return new Promise<any>((resolve, reject) => {
redisClient.eval(unlockScript, 1, resourse, uniqueStr, (err: Error, reply: any) =>{
if (err) {
console.error(err);
reject(err);
}
resolve(reply);
});
});
}
async function get (key: string): Promise<string> {
return new Promise<string>((resovle, reject) => {
redisClient.get(key, (err: Error, reply: string) => {
if (err) {
console.error(err);
reject(err);
}
resovle(reply);
})
});
}
async function set (key: string, value: string): Promise<string> {
return new Promise<string>((resovle, reject) => {
redisClient.set(key, value, (err: Error, reply: string) => {
if (err) {
console.error(err);
reject(err);
}
resovle(reply);
})
});
}
发布于 2018-06-01 12:52:23
您之所以面临这个问题,是因为您并不是在等待每个IFFE
函数完成。因此,它们是并行运行的,不会给你你想要的结果。
要解决这个问题,您需要在每个IFFE函数上执行await
。现在,只有当async
中的函数存在时,我们才能await
。因此,我已经将IFFE函数转换为普通函数。下面是实现
async function run() {
for (let i = 0; i < 1000; i++) {
await add();
}
}
// I know incr command, this is just an example.
async function add() {
let numStr = await get('num');
let num = Number(numStr);
num++;
await set('num', String(num));
console.log(num);
}
function get(key) {
return new Promise((resovle, reject) => {
redisClient.get(key, (err, reply) => {
if (err) {
console.error(err);
reject(err);
}
resovle(reply);
});
});
}
async function set(key, value) {
return new Promise((resovle, reject) => {
redisClient.set(key, value, (err, reply) => {
if (err) {
console.error(err);
reject(err);
}
resovle(reply);
})
});
}
async function runner() {
await run();
await run();
await run();
}
runner();
https://stackoverflow.com/questions/50628429
复制相似问题