首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在不活动后使用Rx .Net执行方法

在不活动后使用Rx .Net执行方法
EN

Stack Overflow用户
提问于 2019-06-17 21:29:57
回答 1查看 290关注 0票数 0

我有一个类似这样的控制器操作:

代码语言:javascript
复制
[HttpPost("Post")]
public async Task Post([FromBody] UpdateDataCommand command)
{
    await _mediator.Send(command);
}

它是在.Net核心中完成的,并且使用MediatR来处理命令。

现在,UpdateDataCommand具有一个整数StationId属性,用于标识桩号。当客户端应用程序通过执行Post调用此方法时,它会更新数据库中的数据。我想使用Rx .Net在等待_mediator.Send(命令)之后以某种方式启动一个计时器。计时器将设置为1分钟。1分钟后,我想调用另一个方法,该方法将在数据库中设置标志,但仅针对此StationId。如果有人使用相同的StationId进行开机自检,计时器应该会自动重置。

在伪代码中看起来像这样:

代码语言:javascript
复制
[HttpPost("Post")]
public async Task Post([FromBody] UpdateDataCommand command)
{
    int stationId = command.StationId;
    // let's assume stationId==2

    //saves data for stationId==2
    await _mediator.Send(command);

    //Start a timer of 1 min
    //if timer fires (meaning the 1 minute has passed) call Method2();
    //if client does another "Post" for stationId==2 in the meantime 
      (let's say that the client does another "Post" for stationId==2 after 20 sec)
      then reset the timer
}

如何使用Reactive Extensions in.Net做到这一点?

UPDATE (@Enigmativity):它仍然不工作,我将计时器设置为10秒,如果你查看输出时间,你会看到我在09:17:49发布了一个帖子(它启动了一个10秒的计时器),然后我在09:17:55发布了一个新的帖子(它已经启动了另一个计时器,但它应该只重置旧的一个),两个计时器都启动了,一个在第一次调用后10秒,另一个在第二次调用后10秒。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-06-18 13:13:13

要使用Rx.Net启动计时器,我们可以调用:

代码语言:javascript
复制
var subscription = Observable.Timer(TimeSpan.FromSeconds(timeout))
    .Subscribe(
        value =>{    /* ... */ }
    );

要取消此订阅,我们只需稍后处理此订阅:

代码语言:javascript
复制
subscription.Dispose();

问题是如何持久化订阅。一种方法是创建一个SubscriptionManager服务(单例),这样我们就可以调用这样的服务来调度任务,然后稍后在控制器操作中取消它,如下所示:

代码语言:javascript
复制
// you controller class

    private readonly ILogger<HomeController> _logger;       // injected by DI
    private readonly SubscriptionManager _subscriptionMgr;  // injected by DI


    public async Task Post(...)
    {
        ...
        // saves data for #stationId
        // Start a timer of 1 min
        this._subscriptionMgr.ScheduleForStationId(stationId);    // schedule a task that for #stationId that will be executed in 60s
    }


    [HttpPost("/Command2")]
    public async Task Command2(...)
    {
        int stationId =  command.StationId;
        if( shouldCancel ){
            this._subscriptionMgr.CancelForStationId(stationId);  // cancel previous task for #stationId
        }
    }

如果您想在内存中管理订阅,我们可以使用ConcurrentDictionary来存储子订阅:

代码语言:javascript
复制
public class SubscriptionManager : IDisposable
{
    private ConcurrentDictionary<string,IDisposable> _dict;
    private readonly IServiceProvider _sp;
    private readonly ILogger<SubscriptionManager> _logger;

    public SubscriptionManager(IServiceProvider sp, ILogger<SubscriptionManager> logger)
    {
        this._dict= new ConcurrentDictionary<string,IDisposable>();
        this._sp = sp;
        this._logger = logger;
    }

    public IDisposable ScheduleForStationId(int stationId)
    {
        var timeout = 60;
        this._logger.LogWarning($"Task for Station#{stationId} will be exexuted in {timeout}s") ;
        var subscription = Observable.Timer(TimeSpan.FromSeconds(timeout))
            .Subscribe(
                value =>{  
                    // if you need update the db, create a new scope:
                    using(var scope = this._sp.CreateScope()){
                        var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
                        var station=dbContext.StationStatus.Where(ss => ss.StationId == stationId)
                            .FirstOrDefault();
                        station.Note = "updated";
                        dbContext.SaveChanges();
                    }
                    this._logger.LogWarning($"Task for Station#{stationId} has been executed") ;
                },
                e =>{
                    Console.WriteLine("Error!"+ e.Message);
                }
            );
        this._dict.AddOrUpdate( stationId.ToString(), subscription , (id , sub)=> {
            sub.Dispose();       // dispose the old one
            return subscription;
        });
        return subscription;
    }

    public void CancelForStationId(int stationId)
    {
        IDisposable subscription = null;
        this._dict.TryGetValue(stationId.ToString(), out subscription);
        this._logger.LogWarning($"Task for station#{stationId} has been canceled");
        subscription?.Dispose();

        // ... if you want to update the db , create a new scope
        using(var scope = this._sp.CreateScope()){
            var dbContext = scope.ServiceProvider.GetRequiredService<AppDbContext>();
            var station=dbContext.StationStatus.Where(ss => ss.StationId == stationId)
                .FirstOrDefault();
            station.Note = "canceled";
            dbContext.SaveChanges();
            this._logger.LogWarning("The db has been changed");
        }
    }

    public void Dispose()
    {
        foreach(var entry in this._dict){
            entry.Value.Dispose();
        }
    }
}

另一种方法是为任务管理器(如cron)创建一个平面记录,但它根本不使用Rx.NET。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56632370

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档