我正在读取和解码一个二进制文件,一个字节一个字节。为此,我使用两个BackgroundWorker:一个用于读取文件,一个用于为文件的“行”生成可变大小的List<byte>,另一个用于处理“行”。
因为我希望它们在parralel中运行,并且我不知道哪一个比另一个更快,所以我使用Queue在两个BackgroundWorker之间传递数据。
事情是这样的:在任何时候,List<byte>都不应该包含任何0值。在将它们添加到队列之前,我会检查它。不过,在Queue的另一端,有些列表包含0值。然而,在每次调用List<byte>时,我都会创建一个新的Dequeue(),因为很明显,如果我不这样做,数据在处理完成之前就会被修改。
我尝试手动创建一个新的List<byte>对象,然后给它分配Dequeue()的结果,但没有改进。这是我第一次使用Queue,因为我的代码是多线程的,所以几乎不可能一步一步地调试。
Queue<List<byte>> q = new Queue<List<byte>>(); // My FIFO queue
// Reading thread
private void BackgroudWorkerRead_DoWork(object sender, DoWorkEventArgs e)
{
// ... read the file
List<byte> line_list = new List<byte>();
// ... filling line_list with data
// in this part I check that no byte added to line_list has the value 0, or else I display an errror message and end the process
q.Enqueue(line_list);
if (!backgroundWorkerNewLine.IsBusy) backgroundWorkerNewLine.RunWorkerAsync(); // if the other BackgroundWorker isn't processing data, now it needs to since we just added some to the queue
}
// Processing thread
private void backgroundWorkerNewLine_DoWork(object sender, DoWorkEventArgs e)
{
while (q.Count > 0) // While there is data to process
{
string line_str = DecodeBytes(new List<byte>(q.Dequeue())); // Decoding
string[] elements = line_str.Split(separator, StringSplitOptions.None); // Separating values
Form1.ActiveForm.Invoke(new MethodInvoker(() => AddRow(elements))); // Add the line to a DataTable from the main thread
}
}
public string DecodeBytes(List<byte> line)
{
/// ... read each byte and return a string of the whole decoded line
}
public void AddRow(string[] el)
{
MyDataTable.Rows.Add(el);
}q.Dequeue()返回的列表似乎不返回q.Enqueue()添加的相同数据
发布于 2019-05-23 02:23:35
您应该使用微软的Reactive (又名Rx) - NuGet System.Reactive.Windows.Forms (假设您正在编写WinForms应用程序)并添加using System.Reactive.Linq;。
让我们使用熟悉的LINQ语法来处理并行操作。
您还没有向我们展示如何将一个文件分解成一个List<byte>列表,所以我假设您有一个类似于IObservable<List<byte>> DeconstructFile(FileInfo fileInfo)的方法。
现在你可以这样做了:
IObservable<string[]> query =
from bytes in DeconstructFile(new FileInfo("myFile.bin"))
from line_str in Observable.Start(() => DecodeBytes(bytes))
select line_str.Split(separator, StringSplitOptions.None);
IDisposable subscription =
query
.ObserveOn(Form1.ActiveForm)
.Subscribe(el => MyDataTable.Rows.Add(el));就这样。它并行运行,Observable.Start根据需要启动新线程,并自动将结果传递给每个步骤。.ObserveOn(Form1.ActiveForm)自动将.Subscribe封送到UI线程。
如果您需要在代码完成之前停止它,只需调用subscription.Dispose()即可。很简单。
发布于 2019-05-23 03:22:45
在创建多线程应用程序时,必须非常小心地防止不同线程同时访问共享资源。如果你不阻止它,坏事就会发生。您正在丢失更新,您的数据结构已损坏,所有这些都是不可预测的和不一致的。为了避免这些问题,您应该同步从不同线程对共享资源的所有访问。它可以通过使用lock语句来实现。因此,建议是:在读取和更新共享资源时始终锁定。在您的示例中,共享资源是Queue。你应该这样锁:
// Reading thread
lock (q)
{
q.Enqueue(line_list);
}
// Processing thread
while (true)
{
List<byte> list;
lock (q)
{
if (q.Count == 0) break;
list = new List<byte>(q.Dequeue());
}
string line_str = DecodeBytes(list); // Decoding
// ...锁定的缺点是它会产生争用,所以您不应该锁定超过绝对必要的锁。特别是要避免在持有锁的同时进行大量计算。
除此之外,您要实现的模式是生产者-消费者模式,.NET提供了一个专门的类来促进这种模式。它是BlockingCollection类,它为您处理所有这些混乱的线程同步。它可以帮助您减少您必须编写的代码,而代价是一个小的学习曲线。基本上,您需要学习Add、CompleteAdding和GetConsumingEnumerable方法,并且您已经准备好了。
https://stackoverflow.com/questions/56183327
复制相似问题