我正在考虑使用这样的事务来实现一种分布式锁:
const lockId = 'myLock';
const lockRef = firebaseAdmin.database().ref(`/locks/${lockId}`);
lockRef.transaction(function(current) {
if (current === null) {
return '1';
}
}, function(error, committed) {
if (committed) {
// .... Do the synchronized work I need ...
lockRef.remove();
}
});
我的问题是:只有在数据不存在的情况下,更新函数才会被调用为null吗?
通常,这是实现分布式锁的有效方法吗?
发布于 2018-05-19 00:13:13
事务将在客户端对当前值进行最佳猜测的情况下开始调用。如果客户端内存中没有当前值,那么最好的猜测是没有当前值。
这意味着,如果得到一个null
,就不能保证数据库中不存在任何值。
另请参阅:
发布于 2021-12-29 10:20:52
“你的第一次尝试是行不通的,理由是”FrankvanPuffelen在他们的回答中说。
但要做到这一点是可能的(虽然不是那么简单)。我与不同的边缘情况进行了相当长时间的斗争,最后提出了这个解决方案,它通过了无数不同的测试,从而验证了这可以防止所有可能的竞争条件和死锁:
import crypto from 'crypto';
import { promisify } from 'util';
import * as firebaseAdmin from 'firebase-admin';
const randomBytes = promisify(crypto.randomBytes);
// A string which is stored in the place of the value to signal that the mutex holder has
// encountered an error. This must be unique value for each mutex so that we can distinguish old,
// stale rejection states from the failures of the mutex that we are currently waiting for.
const rejectionSignal = (mutexId: string): string => `rejected${mutexId}`;
const isValidValue = (value: unknown): boolean => {
// `value` could be string in the form `rejected...` which signals failure,
// using this function makes sure we don't return that as "valid" value.
return !!value && (typeof value !== 'string' || !value.startsWith('rejected'));
};
export const getOrSetValueWithLocking = async <T>(id: string, value: T): Promise<T> => {
const ref = firebaseAdmin.database().ref(`/myValues/${id}`);
const mutexRef = firebaseAdmin.database().ref(`/mutexes/myValues/${id}`);
const attemptingMutexId = (await randomBytes(16)).toString('hex');
const mutexTransaction = await mutexRef.transaction((data) => {
if (data === null) {
return attemptingMutexId;
}
});
const owningMutexId = mutexTransaction.snapshot.val();
if (mutexTransaction.committed) {
// We own the mutex (meaning that `attemptingMutexId` equals `owningMutexId`).
try {
const existing = (await ref.once('value')).val();
if (isValidValue(existing)) {
return existing;
}
/*
--- YOU CAN DO ANYTHING HERE ---
E.g. create `value` here instead of passing it as an argument.
*/
await ref.set(value);
return value;
} catch (e) {
await ref.set(rejectionSignal(owningMutexId));
throw e;
} finally {
// Since we own the mutex, we MUST make sure to release it, no matter what happens.
await mutexRef.remove();
}
} else {
// Some other caller owns the mutex -> if the value is not yet
// available, wait for them to insert it or to signal a failure.
return new Promise((resolve, reject) => {
ref.on('value', (snapshot) => {
const val = snapshot.val();
if (isValidValue(val)) {
resolve(val);
} else if (val === rejectionSignal(owningMutexId)) {
reject(new Error('Mutex holder encountered an error and was not able to set a value.'));
} // else: Wait for a new value.
});
});
}
};
我的用例是,我在Vercel中运行了Next.js API路由,其中并行执行无服务器函数的唯一共享状态是一个Firebase实时数据库。
https://stackoverflow.com/questions/50420573
复制相似问题