首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用队列在两个BackgroundWorkers之间传递数据

使用队列在两个BackgroundWorkers之间传递数据
EN

Stack Overflow用户
提问于 2019-05-17 09:19:37
回答 2查看 197关注 0票数 1

我正在读取和解码一个二进制文件,一个字节一个字节。为此,我使用两个BackgroundWorker:一个用于读取文件,一个用于为文件的“行”生成可变大小的List<byte>,另一个用于处理“行”。

因为我希望它们在parralel中运行,并且我不知道哪一个比另一个更快,所以我使用Queue在两个BackgroundWorker之间传递数据。

事情是这样的:在任何时候,List<byte>都不应该包含任何0值。在将它们添加到队列之前,我会检查它。不过,在Queue的另一端,有些列表包含0值。然而,在每次调用List<byte>时,我都会创建一个新的Dequeue(),因为很明显,如果我不这样做,数据在处理完成之前就会被修改。

我尝试手动创建一个新的List<byte>对象,然后给它分配Dequeue()的结果,但没有改进。这是我第一次使用Queue,因为我的代码是多线程的,所以几乎不可能一步一步地调试。

代码语言:javascript
运行
复制
Queue<List<byte>> q = new Queue<List<byte>>(); // My FIFO queue

// Reading thread
private void BackgroudWorkerRead_DoWork(object sender, DoWorkEventArgs e)
{
      // ... read the file
      List<byte> line_list = new List<byte>();
      // ... filling line_list with data
      // in this part I check that no byte added to line_list has the value 0, or else I display an errror message and end the process
      q.Enqueue(line_list);
      if (!backgroundWorkerNewLine.IsBusy) backgroundWorkerNewLine.RunWorkerAsync(); // if the other BackgroundWorker isn't processing data, now it needs to since we just added some to the queue
}

// Processing thread
private void backgroundWorkerNewLine_DoWork(object sender, DoWorkEventArgs e)
{
    while (q.Count > 0) // While there is data to process
    {
          string line_str = DecodeBytes(new List<byte>(q.Dequeue())); // Decoding
          string[] elements = line_str.Split(separator, StringSplitOptions.None); // Separating values

          Form1.ActiveForm.Invoke(new MethodInvoker(() => AddRow(elements))); // Add the line to a DataTable from the main thread
    }
}

public string DecodeBytes(List<byte> line)
{
 /// ... read each byte and return a string of the whole decoded line
}

public void AddRow(string[] el)
{
    MyDataTable.Rows.Add(el);
}

q.Dequeue()返回的列表似乎不返回q.Enqueue()添加的相同数据

EN

回答 2

Stack Overflow用户

发布于 2019-05-23 02:23:35

您应该使用微软的Reactive (又名Rx) - NuGet System.Reactive.Windows.Forms (假设您正在编写WinForms应用程序)并添加using System.Reactive.Linq;

让我们使用熟悉的LINQ语法来处理并行操作。

您还没有向我们展示如何将一个文件分解成一个List<byte>列表,所以我假设您有一个类似于IObservable<List<byte>> DeconstructFile(FileInfo fileInfo)的方法。

现在你可以这样做了:

代码语言:javascript
运行
复制
IObservable<string[]> query =
    from bytes in DeconstructFile(new FileInfo("myFile.bin"))
    from line_str in Observable.Start(() => DecodeBytes(bytes))
    select line_str.Split(separator, StringSplitOptions.None);

IDisposable subscription =
    query
        .ObserveOn(Form1.ActiveForm)
        .Subscribe(el => MyDataTable.Rows.Add(el));

就这样。它并行运行,Observable.Start根据需要启动新线程,并自动将结果传递给每个步骤。.ObserveOn(Form1.ActiveForm)自动将.Subscribe封送到UI线程。

如果您需要在代码完成之前停止它,只需调用subscription.Dispose()即可。很简单。

票数 0
EN

Stack Overflow用户

发布于 2019-05-23 03:22:45

在创建多线程应用程序时,必须非常小心地防止不同线程同时访问共享资源。如果你不阻止它,坏事就会发生。您正在丢失更新,您的数据结构已损坏,所有这些都是不可预测的和不一致的。为了避免这些问题,您应该同步从不同线程对共享资源的所有访问。它可以通过使用lock语句来实现。因此,建议是:在读取更新共享资源时始终锁定。在您的示例中,共享资源是Queue。你应该这样锁:

代码语言:javascript
运行
复制
// Reading thread
lock (q)
{
    q.Enqueue(line_list);
}

// Processing thread
while (true)
{
    List<byte> list;
    lock (q)
    {
        if (q.Count == 0) break;
        list = new List<byte>(q.Dequeue());
    }
    string line_str = DecodeBytes(list); // Decoding
    // ...

锁定的缺点是它会产生争用,所以您不应该锁定超过绝对必要的锁。特别是要避免在持有锁的同时进行大量计算。

除此之外,您要实现的模式是生产者-消费者模式,.NET提供了一个专门的类来促进这种模式。它是BlockingCollection类,它为您处理所有这些混乱的线程同步。它可以帮助您减少您必须编写的代码,而代价是一个小的学习曲线。基本上,您需要学习AddCompleteAddingGetConsumingEnumerable方法,并且您已经准备好了。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56183327

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档