我正在开发一个在后台不断发送数据流的程序,我想让用户设置上传和下载限制的上限。
我已经阅读了token bucket和leaky bucket高程,后者似乎符合描述,因为这不是最大化网络带宽的问题,而是尽可能不引人注目。
然而,我有点不确定我将如何实现这一点。一种自然的方法是扩展抽象的Stream类,以使扩展现有流量变得简单,但这不需要额外的线程在同时接收数据时发送数据(泄漏桶)吗?任何关于其他实现相同功能的提示都将不胜感激。
此外,尽管我可以修改程序接收的数据量,但带宽限制在C#级别上的效果如何?计算机是否仍然会接收数据并简单地保存它,从而有效地取消节流效果,还是会等到我要求接收更多数据?
编辑:我对限制传入和传出数据很感兴趣,因为我无法控制流的另一端。
发布于 2017-02-09 18:02:28
基于@0xDEADBEEF的解决方案,我基于Rx调度器创建了以下(可测试的)解决方案:
public class ThrottledStream : Stream
{
private readonly Stream parent;
private readonly int maxBytesPerSecond;
private readonly IScheduler scheduler;
private readonly IStopwatch stopwatch;
private long processed;
public ThrottledStream(Stream parent, int maxBytesPerSecond, IScheduler scheduler)
{
this.maxBytesPerSecond = maxBytesPerSecond;
this.parent = parent;
this.scheduler = scheduler;
stopwatch = scheduler.StartStopwatch();
processed = 0;
}
public ThrottledStream(Stream parent, int maxBytesPerSecond)
: this (parent, maxBytesPerSecond, Scheduler.Immediate)
{
}
protected void Throttle(int bytes)
{
processed += bytes;
var targetTime = TimeSpan.FromSeconds((double)processed / maxBytesPerSecond);
var actualTime = stopwatch.Elapsed;
var sleep = targetTime - actualTime;
if (sleep > TimeSpan.Zero)
{
using (var waitHandle = new AutoResetEvent(initialState: false))
{
scheduler.Sleep(sleep).GetAwaiter().OnCompleted(() => waitHandle.Set());
waitHandle.WaitOne();
}
}
}
public override bool CanRead
{
get { return parent.CanRead; }
}
public override bool CanSeek
{
get { return parent.CanSeek; }
}
public override bool CanWrite
{
get { return parent.CanWrite; }
}
public override void Flush()
{
parent.Flush();
}
public override long Length
{
get { return parent.Length; }
}
public override long Position
{
get
{
return parent.Position;
}
set
{
parent.Position = value;
}
}
public override int Read(byte[] buffer, int offset, int count)
{
var read = parent.Read(buffer, offset, count);
Throttle(read);
return read;
}
public override long Seek(long offset, SeekOrigin origin)
{
return parent.Seek(offset, origin);
}
public override void SetLength(long value)
{
parent.SetLength(value);
}
public override void Write(byte[] buffer, int offset, int count)
{
Throttle(count);
parent.Write(buffer, offset, count);
}
}
以及一些只需要几毫秒的测试:
[TestMethod]
public void ShouldThrottleReading()
{
var content = Enumerable
.Range(0, 1024 * 1024)
.Select(_ => (byte)'a')
.ToArray();
var scheduler = new TestScheduler();
var source = new ThrottledStream(new MemoryStream(content), content.Length / 8, scheduler);
var target = new MemoryStream();
var t = source.CopyToAsync(target);
t.Wait(10).Should().BeFalse();
scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks);
t.Wait(10).Should().BeFalse();
scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks - 1);
t.Wait(10).Should().BeFalse();
scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks);
t.Wait(10).Should().BeTrue();
}
[TestMethod]
public void ShouldThrottleWriting()
{
var content = Enumerable
.Range(0, 1024 * 1024)
.Select(_ => (byte)'a')
.ToArray();
var scheduler = new TestScheduler();
var source = new MemoryStream(content);
var target = new ThrottledStream(new MemoryStream(), content.Length / 8, scheduler);
var t = source.CopyToAsync(target);
t.Wait(10).Should().BeFalse();
scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks);
t.Wait(10).Should().BeFalse();
scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks - 1);
t.Wait(10).Should().BeFalse();
scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks);
t.Wait(10).Should().BeTrue();
}
发布于 2015-04-05 19:18:20
我想出了arul提到的ThrottledStream-Class的不同实现。我的版本使用WaitHandle和计时器,间隔为1秒:
public ThrottledStream(Stream parentStream, int maxBytesPerSecond=int.MaxValue)
{
MaxBytesPerSecond = maxBytesPerSecond;
parent = parentStream;
processed = 0;
resettimer = new System.Timers.Timer();
resettimer.Interval = 1000;
resettimer.Elapsed += resettimer_Elapsed;
resettimer.Start();
}
protected void Throttle(int bytes)
{
try
{
processed += bytes;
if (processed >= maxBytesPerSecond)
wh.WaitOne();
}
catch
{
}
}
private void resettimer_Elapsed(object sender, ElapsedEventArgs e)
{
processed = 0;
wh.Set();
}
当超过带宽限制时,线程将休眠,直到下一秒开始。不需要计算最佳睡眠时间。
全面实施:
public class ThrottledStream : Stream
{
#region Properties
private int maxBytesPerSecond;
/// <summary>
/// Number of Bytes that are allowed per second
/// </summary>
public int MaxBytesPerSecond
{
get { return maxBytesPerSecond; }
set
{
if (value < 1)
throw new ArgumentException("MaxBytesPerSecond has to be >0");
maxBytesPerSecond = value;
}
}
#endregion
#region Private Members
private int processed;
System.Timers.Timer resettimer;
AutoResetEvent wh = new AutoResetEvent(true);
private Stream parent;
#endregion
/// <summary>
/// Creates a new Stream with Databandwith cap
/// </summary>
/// <param name="parentStream"></param>
/// <param name="maxBytesPerSecond"></param>
public ThrottledStream(Stream parentStream, int maxBytesPerSecond=int.MaxValue)
{
MaxBytesPerSecond = maxBytesPerSecond;
parent = parentStream;
processed = 0;
resettimer = new System.Timers.Timer();
resettimer.Interval = 1000;
resettimer.Elapsed += resettimer_Elapsed;
resettimer.Start();
}
protected void Throttle(int bytes)
{
try
{
processed += bytes;
if (processed >= maxBytesPerSecond)
wh.WaitOne();
}
catch
{
}
}
private void resettimer_Elapsed(object sender, ElapsedEventArgs e)
{
processed = 0;
wh.Set();
}
#region Stream-Overrides
public override void Close()
{
resettimer.Stop();
resettimer.Close();
base.Close();
}
protected override void Dispose(bool disposing)
{
resettimer.Dispose();
base.Dispose(disposing);
}
public override bool CanRead
{
get { return parent.CanRead; }
}
public override bool CanSeek
{
get { return parent.CanSeek; }
}
public override bool CanWrite
{
get { return parent.CanWrite; }
}
public override void Flush()
{
parent.Flush();
}
public override long Length
{
get { return parent.Length; }
}
public override long Position
{
get
{
return parent.Position;
}
set
{
parent.Position = value;
}
}
public override int Read(byte[] buffer, int offset, int count)
{
Throttle(count);
return parent.Read(buffer, offset, count);
}
public override long Seek(long offset, SeekOrigin origin)
{
return parent.Seek(offset, origin);
}
public override void SetLength(long value)
{
parent.SetLength(value);
}
public override void Write(byte[] buffer, int offset, int count)
{
Throttle(count);
parent.Write(buffer, offset, count);
}
#endregion
}
https://stackoverflow.com/questions/371032
复制相似问题