前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >.NET分布式Orleans - 7 - Streaming

.NET分布式Orleans - 7 - Streaming

作者头像
Chester Chen
发布2024-04-13 20:27:26
700
发布2024-04-13 20:27:26
举报
文章被收录于专栏:chester技术分享chester技术分享
概念

在Orleans中,Streaming是一组API和功能集,它提供了一种构建、发布和消费数据流的方式。

这些流可以是任何类型的数据,从简单的消息到复杂的事件或数据记录。Streaming API允许你定义、发布和消费这些流,而无需关心底层的传输机制或数据存储。

每个流都有一个唯一的标识符,称为StreamId,用于区分不同的流。流可以是持久的,也可以是临时的,具体取决于所使用的流提供者(Stream Provider)。流提供者负责处理流的存储、传输和故障恢复。

作用

Streaming在Orleans中起到了至关重要的作用,主要体现在以下几个方面:

  1. 解耦:Streaming允许将数据的产生者和消费者解耦。生产者可以发布数据到流中,而消费者可以独立地订阅这些流并处理数据。这种解耦使得系统更加灵活和可扩展。
  2. 实时性:通过Streaming,你可以实时地处理和响应数据流。这对于需要实时分析、监控或响应的场景非常有用。
  3. 故障恢复:Orleans的Streaming机制具有强大的故障恢复能力。即使在出现网络分区或节点故障的情况下,流提供者也能够确保数据的可靠性和一致性。
应用场景
  1. 实时日志分析:你可以将应用程序的日志消息发布到流中,并使用专门的消费者来分析这些日志。这允许你实时地监控和响应应用程序的行为。
  2. 事件驱动架构:在事件驱动架构中,你可以使用Streaming来发布事件,并由多个消费者来处理这些事件。这有助于构建松耦合、可扩展和响应式的系统。
  3. 分布式协作:Streaming也可以用于实现分布式系统中的协作和通信。例如,多个节点可以发布状态更新到流中,其他节点可以订阅这些流以获取最新的状态信息。
示例

安装nuget包

代码语言:javascript
复制
<PackageReference Include="Microsoft.Orleans.Streaming" Version="8.0.0" />
代码语言:javascript
复制
配置Streaming
代码语言:javascript
复制
siloHostBuilder.AddMemoryStreams("StreamProvider").AddMemoryGrainStorage("PubSubStore");
代码语言:javascript
复制
定义一个Grain生成事件
代码语言:javascript
复制
public interface ISender : IGrainWithStringKey
{
    Task Send(Guid rid);
}

public class SenderGrain : Grain, ISender
{
    public Task Send(Guid rid)
    {
        var streamProvider = this.GetStreamProvider("StreamProvider");
        var streamId = StreamId.Create("RANDOMDATA", rid);
        var stream = streamProvider.GetStream<int>(streamId);
        RegisterTimer(_ =>
        {
            return stream.OnNextAsync(Random.Shared.Next());
        }, null, TimeSpan.FromMilliseconds(1_000), TimeSpan.FromMilliseconds(1_000));
        return Task.CompletedTask;
    }
}

再定义一个Grain订阅事件

代码语言:javascript
复制
public interface IRandomReceiver : IGrainWithGuidKey
{
    Task Receive();
}

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
{
    public override async Task OnActivateAsync(CancellationToken cancellationToken)
    {
        var streamProvider = this.GetStreamProvider("StreamProvider");
        var rid = this.GetPrimaryKey();
        var streamId = StreamId.Create("RANDOMDATA", rid);
        var stream = streamProvider.GetStream<int>(streamId);

        await stream.SubscribeAsync<int>(
            async (data, token) =>
            {
                Console.WriteLine(data);
                await Task.CompletedTask;
            });
        base.OnActivateAsync(cancellationToken);
    }
    public async Task Receive()
    {
            
    }
}

然后即可测试

代码语言:javascript
复制
var rid = Guid.NewGuid();
var sender1 = client.GetGrain<ISender>("sender1");
await sender1.Send(rid);
var reciver1 = client.GetGrain<IRandomReceiver>(new Guid());
await reciver1.Receive();
流提供程序

提供程序可以通过在nuget种搜索Orleans.Streaming,也可以通过PersistentStreamProvider 与 IQueueAdapter 重写来自定义Provider

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2024-03-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 chester技术分享 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概念
  • 作用
  • 应用场景
  • 示例
  • 流提供程序
相关产品与服务
数据保险箱
数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档