在我的asp.net webapi (框架.Net 4.7.2)中,我需要调用Redis (使用StackExchange.Redis),以便删除火灾中的密钥并忘记,我正在进行一些压力测试。
当我比较不同的方式来获得最大的速度:
我第一次尝试用Task.Run动作给Redis打电话,但我观察到的问题是,在压力下,我的webapi的记忆一直在上升。内存中满是带有折叠代码的System.Threading.IThreadPoolWorkItem[]对象:
[HttpPost]
[Route("api/values/testpostfireforget")]
public ApiResult<int> DeleteFromBasketId([FromBody] int basketId)
{
var response = new DeleteFromBasketResponse<int>();
var cpt = Interlocked.Increment(ref counter);
Task.Run(async () => {
await db.StringSetAsync($"BASKET_TO_DELETE_{cpt}",cpt.ToString())
.ConfigureAwait(false);
});
return response;
}
因此,我认为,在压力下,我的api一直在内存中执行后台任务,并尽可能快地执行它们,但比传入的请求要少。
因此,我正在寻找一种方法,让只有一个长寿命的后台线程与asp.net webapi一起运行,它可以捕获发送给Redis的命令,并通过流水线执行这些命令。
我在考虑通过实现IHostedService接口来运行后台任务,但在这种情况下,后台任务似乎不会与我当前的http请求共享任何状态。因此,对于预定的后台任务来说,实现IhostedService是很方便的,但在我的情况下不是这样,或者我不知道如何实现.
发布于 2021-12-20 16:53:34
基于StackExchange.Redis文档,您可以使用CommandFlags.FireAndForget
标志:
[HttpPost]
[Route("api/values/testpostfireforget")]
public ApiResult<int> DeleteFromBasketId([FromBody] int basketId)
{
var response = new DeleteFromBasketResponse<int>();
var cpt = Interlocked.Increment(ref counter);
db.StringSet($"BASKET_TO_DELETE_{cpt}", cpt.ToString(), flags: CommandFlags.FireAndForget);
return response;
}
编辑1:基于注释的另一种解决方案
您可以使用pub/sub方法。像这样的事情应该有效:
public class MessageBatcher
{
private readonly IDatabase target;
private readonly BlockingCollection<Action<IDatabaseAsync>> tasks = new();
private Task worker;
public MessageBatcher(IDatabase target) => this.target = target;
public void AddMessage(Action<IDatabaseAsync> task) => tasks.Add(task);
public IDisposable Start(int batchSize)
{
var cancellationTokenSource = new CancellationTokenSource();
worker = Task.Factory.StartNew(state =>
{
var count = 0;
var tokenSource = (CancellationTokenSource) state;
var box = new StrongBox<IBatch>(target.CreateBatch());
tokenSource.Token.Register(b => ((StrongBox<IBatch>)b).Value.Execute(), box);
foreach (var task in tasks.GetConsumingEnumerable(tokenSource.Token))
{
var batch = box.Value;
task(batch);
if (++count == batchSize)
{
batch.Execute();
box.Value = target.CreateBatch();
count = 0;
}
}
}, cancellationTokenSource, cancellationTokenSource.Token, TaskCreationOptions.LongRunning, TaskScheduler.Current);
return new Disposer(worker, cancellationTokenSource);
}
private class Disposer : IDisposable
{
private readonly Task worker;
private readonly CancellationTokenSource tokenSource;
public Disposer(Task worker, CancellationTokenSource tokenSource) => (this.worker, this.tokenSource) = (worker, tokenSource);
public void Dispose()
{
tokenSource.Cancel();
worker.Wait();
tokenSource.Dispose();
}
}
}
用法:
private readonly MessageBatcher batcher;
ctor(MessageBatcher batcher) // ensure that passed `handler` is singleton and already already started
{
this.batcher= batcher;
}
[HttpPost]
[Route("api/values/testpostfireforget")]
public ApiResult<int> DeleteFromBasketId([FromBody] int basketId)
{
var response = new DeleteFromBasketResponse<int>();
var cpt = Interlocked.Increment(ref counter);
batcher.AddMessage(db => db.StringSetAsync($"BASKET_TO_DELETE_{cpt}", cpt.ToString(), flags: CommandFlags.FireAndForget));
return response;
}
https://stackoverflow.com/questions/70428482
复制