在Orleans中,Streaming是一组API和功能集,它提供了一种构建、发布和消费数据流的方式。
这些流可以是任何类型的数据,从简单的消息到复杂的事件或数据记录。Streaming API允许你定义、发布和消费这些流,而无需关心底层的传输机制或数据存储。
每个流都有一个唯一的标识符,称为StreamId,用于区分不同的流。流可以是持久的,也可以是临时的,具体取决于所使用的流提供者(Stream Provider)。流提供者负责处理流的存储、传输和故障恢复。
Streaming在Orleans中起到了至关重要的作用,主要体现在以下几个方面:
安装nuget包
<PackageReference Include="Microsoft.Orleans.Streaming" Version="8.0.0" />
配置Streaming
siloHostBuilder.AddMemoryStreams("StreamProvider").AddMemoryGrainStorage("PubSubStore");
定义一个Grain生成事件
public interface ISender : IGrainWithStringKey
{
Task Send(Guid rid);
}
public class SenderGrain : Grain, ISender
{
public Task Send(Guid rid)
{
var streamProvider = this.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create("RANDOMDATA", rid);
var stream = streamProvider.GetStream<int>(streamId);
RegisterTimer(_ =>
{
return stream.OnNextAsync(Random.Shared.Next());
}, null, TimeSpan.FromMilliseconds(1_000), TimeSpan.FromMilliseconds(1_000));
return Task.CompletedTask;
}
}
再定义一个Grain订阅事件
public interface IRandomReceiver : IGrainWithGuidKey
{
Task Receive();
}
[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver
{
public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider("StreamProvider");
var rid = this.GetPrimaryKey();
var streamId = StreamId.Create("RANDOMDATA", rid);
var stream = streamProvider.GetStream<int>(streamId);
await stream.SubscribeAsync<int>(
async (data, token) =>
{
Console.WriteLine(data);
await Task.CompletedTask;
});
base.OnActivateAsync(cancellationToken);
}
public async Task Receive()
{
}
}
然后即可测试
var rid = Guid.NewGuid();
var sender1 = client.GetGrain<ISender>("sender1");
await sender1.Send(rid);
var reciver1 = client.GetGrain<IRandomReceiver>(new Guid());
await reciver1.Receive();
提供程序可以通过在nuget种搜索Orleans.Streaming,也可以通过PersistentStreamProvider 与 IQueueAdapter 重写来自定义Provider