我试图在NestJS项目中使用Prisma事务,但我想不出一种干净的方法来完成以下工作:
拥有一个将调用其他服务并将所有服务绑定到事务的服务。例:
@Injectable()
export class OrdersService {
constructor(private prismaService: PrismaService, ...) {}
async someFn() {
return await this.prismaService.$transaction(async (prismaServiceBoundToTransaction): Promise<any> => {
await this.userService.update() // This will perform an update using prismaService internally
await this.otherService.delete() // Again, it'll use prismaService
}
}
}
在这种情况下,用户和其他服务都将使用自己的prisma服务,并且不会绑定到事务。
是否有一种无需将prismaServiceBoundToTx
传递给每个方法就可以完成这一任务的方法?
发布于 2022-09-14 12:14:04
在找到合适的解决方案时,我遇到的主要问题是,交互式事务的lambda中的prisma客户端不是成熟的客户机,而是Prisma.TransactionClient,缺少$on、$connect、$disconnect、$use和$transaction方法。如果prisma能在这里提供一个完整的客户,那么您所能做的就是做这样的交易:
**THIS DOES NOT WORK BECAUSE prismaServiceBoundToTransaction IS JUST OF TYPE Prisma.TransactionClient!!!**
return await this.prismaService.$transaction(async (prismaServiceBoundToTransaction): Promise<any> => {
const userService = new UserService(prismaServiceBoundToTransaction)
const otherService = new OtherService(prismaServiceBoundToTransaction)
//Following calls will use prismaServiceBoundToTransaction internally
await userService.update()
await otherService.delete()
}
当然,如果UserService和OtherService是无状态的,那么上面只起作用。
因此,对于我的解决方案,我创建了一个新的接口,它将提供Prisma.TransactionClient的所有方法,但也提供了一个创建事务的自定义方法。像您的UserService这样的所有服务只会检索这个确切的接口,所以它们不能调用$transaction
,而只能调用我的interactiveTransaction
方法!
export interface PrismaClientWithCustomTransaction
extends Readonly<Prisma.TransactionClient> {
interactiveTransaction<F>(
fn: (prisma: Prisma.TransactionClient) => Promise<F>,
options?: {
maxWait?: number | undefined;
timeout?: number | undefined;
isolationLevel?: Prisma.TransactionIsolationLevel | undefined;
}
): Promise<F>;
}
然后,我们创建一个具体的类TransactionalPrismaClient,通过在它的构造函数中检索一个Prisma.TransactionClient并转发它的所有方法来实现并交付它。此外,我们还通过使用interactiveTransaction执行lambda方法来实现Prisma.TransactionClient方法。
export class TransactionalPrismaClient<
T extends Prisma.PrismaClientOptions = Prisma.PrismaClientOptions,
U = 'log' extends keyof T
? T['log'] extends Array<Prisma.LogLevel | Prisma.LogDefinition>
? Prisma.GetEvents<T['log']>
: never
: never,
GlobalReject extends
| Prisma.RejectOnNotFound
| Prisma.RejectPerOperation
| false
| undefined = 'rejectOnNotFound' extends keyof T
? T['rejectOnNotFound']
: false
> implements PrismaClientWithCustomTransaction
{
constructor(private readonly transactionalClient: Prisma.TransactionClient) {}
$executeRaw<T = unknown>(
query: TemplateStringsArray | Prisma.Sql,
...values: any[]
): PrismaPromise<number> {
return this.transactionalClient.$executeRaw(query, ...values);
}
$executeRawUnsafe<T = unknown>(
query: string,
...values: any[]
): PrismaPromise<number> {
return this.transactionalClient.$executeRawUnsafe(query, ...values);
}
$queryRaw<T = unknown>(
query: TemplateStringsArray | Prisma.Sql,
...values: any[]
): PrismaPromise<T> {
return this.transactionalClient.$queryRaw(query, ...values);
}
$queryRawUnsafe<T = unknown>(
query: string,
...values: any[]
): PrismaPromise<T> {
return this.transactionalClient.$queryRawUnsafe(query, ...values);
}
get otherEntity(): Prisma.OtherEntityDelegate<GlobalReject> {
return this.transactionalClient.otherEntity;
}
get userEntity(): Prisma.UserEntityDelegate<GlobalReject> {
return this.transactionalClient.userEntity;
}
async interactiveTransaction<F>(
fn: (prisma: Prisma.TransactionClient) => Promise<F>,
options?: {
maxWait?: number | undefined;
timeout?: number | undefined;
isolationLevel?: Prisma.TransactionIsolationLevel | undefined;
}
): Promise<F> {
return await fn(this.transactionalClient);
}
}
在您的PrismaClientWithCustomTransaction.中,我们还需要实现interactiveTransaction方法,以便满足我们定义的接口interactiveTransaction
@Injectable()
export class PrismaService
extends PrismaClient
implements OnModuleInit, PrismaClientWithCustomTransaction
{
private readonly logger = new ConsoleLogger(PrismaService.name);
async onModuleInit() {
this.logger.log('Trying to connect to db.');
await this.$connect();
}
async enableShutdownHooks(app: INestApplication) {
this.$on('beforeExit', async () => {
await app.close();
});
}
interactiveTransaction<R>(
fn: (prisma: Prisma.TransactionClient) => Promise<R>,
options?: {
maxWait?: number | undefined;
timeout?: number | undefined;
isolationLevel?: Prisma.TransactionIsolationLevel | undefined;
},
numRetries = 1
): Promise<R> {
let result: Promise<R> | null = null;
for (let i = 0; i < numRetries; i++) {
try {
result = this.$transaction(fn, options);
} catch (e) {
if (e instanceof Prisma.PrismaClientKnownRequestError) {
//TODO?
} else {
throw e;
}
}
if (result != null) {
return result;
}
}
throw new Error(
'No result in transaction after maximum number of retries.'
);
}
}
因为在我们的服务中,我们现在期望的是PrismaClientWithCustomTransaction接口,NestJ的自动注入不再有效,我们必须使用令牌提供PrismaService:
providers: [
{
provide: 'PRISMA_SERVICE_TOKEN',
useClass: PrismaService,
},
],
exportt class UserService{
constructor(@Inject('PRISMA_SERVICE_TOKEN') private readonly prisma: PrismaClientWithCustomTransaction){}
}
好了,现在我们可以做以下几件事:
@Injectable()
export class OrdersService {
constructor( @Inject('PRISMA_SERVICE_TOKEN')
private readonly prisma: PrismaClientWithCustomTransaction, ...) {}
async someFn() {
return await this.prisma.interactiveTransaction(
async (client) => {
//You can still use client directly, if you dont need nested transaction logic
return client.userEntity.create(...)
//Or create services for nested usage
const transactionalClient = new TransactionalPrismaClient(client);
const userService = new UserService(transactionalClient);
return userService.createUser(...);
});
},
{ isolationLevel: Prisma.TransactionIsolationLevel.RepeatableRead }
);
}
}
如果您需要$on、$connect、$disconnect、$use,当然您仍然可以使用它的常规接口注入原始PrismaService。
https://stackoverflow.com/questions/72998476
复制相似问题