首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >多线程文件上传同步

多线程文件上传同步
EN

Stack Overflow用户
提问于 2012-12-02 01:17:53
回答 2查看 2.5K关注 0票数 21

目前,我正在开发一个用于传输文件的Delphi客户端/服务器应用程序(使用Indy XE3组件)。客户端部分监控一个文件夹,获取其中的文件列表,将它们上传到服务器并删除原始文件。上传是由一个单独的线程完成的,该线程逐个处理文件。文件的范围从0到几千,并且它们的大小也有很大不同。

这是一款同时针对OSX和Windows编译的火猴应用程序,所以我不得不使用TThread而不是我喜欢的OmniThreadLibrary。我的客户报告应用程序随机冻结。我不能复制它,但由于我没有太多使用TThread的经验,我可能在某个地方设置了死锁条件。我读了相当多的例子,但我仍然不确定一些多线程的细节。

应用程序结构很简单:

主线程中的计时器检查文件夹,并将有关每个文件的信息放入记录中,该记录放入通用TList中。此列表保存有关文件的名称、大小、进度、文件是否已完全上载或是否必须重试的信息。在带有进度条的网格中显示的所有内容,等等。此列表仅由主线程访问。之后,通过调用AddFile方法(下面的代码)将列表中的项发送到线程。线程将所有文件存储在一个线程安全的队列中,就像下面这个http://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/

上载文件时,上传器线程通过调用同步来通知主线程。

主线程定期调用Uploader.GetProgress方法来检查当前文件进度并显示它。这个函数实际上不是线程安全的,但是它会导致死锁,或者只返回错误的数据吗?

进行进度检查的安全有效的方法是什么?

那么,这种方法是可以的,还是我错过了什么?你会怎么做呢?

例如,我想创建一个新的线程来读取文件夹内容。这意味着我使用的TList必须是线程安全的,但必须始终访问它才能刷新图形用户界面网格中显示的信息。所有的同步不都只会减慢GUI的运行速度吗?

我已经在下面发布了简化的代码,以防有人想看它。如果没有,我很乐意听到一些关于我应该使用什么的意见。主要目标是在OSX和Windows上工作;能够显示所有文件的信息和当前文件的进度;并且无论文件的数量和大小如何,都能做出响应。

这是上传器线程的代码。为了更容易阅读,我删除了其中的一些内容:

代码语言:javascript
运行
复制
type
  TFileStatus = (fsToBeQueued, fsUploaded, fsQueued);
  TFileInfo = record
    ID: Integer;
    Path: String;
    Size: Int64;
    UploadedSize: Int64;
    Status: TFileStatus;
  end;

  TUploader = class(TThread)
  private
    FTP: TIdFTP;
    fQueue: TThreadedQueue<TFileInfo>;
    fCurrentFile: TFileInfo;
    FUploading: Boolean;
    procedure ConnectFTP;
    function UploadFile(aFileInfo: TFileInfo): String;
    procedure OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
    procedure SignalComplete;
    procedure SignalError(aError: String);
  protected
    procedure Execute; override;
  public
    property Uploading: Boolean read FUploading;
    constructor Create;
    destructor Destroy; override;
    procedure Terminate;
    procedure AddFile(const aFileInfo: TFileInfo);
    function GetProgress: TFileInfo;
  end;

procedure TUploader.AddFile(const aFileInfo: TFileInfo);
begin
  fQueue.Enqueue(aFileInfo);
end;

procedure TUploader.ConnectFTP;
begin
  ...
    FTP.Connect;
end;

constructor TUploader.Create;
begin
  inherited Create(false);
  FreeOnTerminate := false;
  fQueue := TThreadedQueue<TFileInfo>.Create;
  // Create the TIdFTP and set ports and other params
  ...
end;

destructor TUploader.Destroy;
begin
  fQueue.Close;
  fQueue.Free;
  FTP.Free;
  inherited;
end;

// Process the whole queue and inform the main thread of the progress
procedure TUploader.Execute;
var
  Temp: TFileInfo;
begin
  try
    ConnectFTP;
  except
    on E: Exception do
      SignalError(E.Message);
  end;

  // Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails
  while fQueue.Peek(fCurrentFile) = wrSignaled do
    try
      if UploadFile(fCurrentFile) = '' then
      begin
        fQueue.Dequeue(Temp);  // Delete the item from the queue if succesful
        SignalComplete;
      end;
    except
      on E: Exception do
        SignalError(E.Message);
    end;
end;

// Return the current file's info to the main thread. Used to update the progress indicators
function TUploader.GetProgress: TFileInfo;
begin
  Result := fCurrentFile;
end;

// Update the uploaded size for the current file. This information is retrieved by a timer from the main thread to update the progress bar
procedure TUploader.OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
begin
  fCurrentFile.UploadedSize := AWorkCount;
end;

procedure TUploader.SignalComplete;
begin
  Synchronize(
    procedure
    begin
      frmClientMain.OnCompleteFile(fCurrentFile);
    end);
end;

procedure TUploader.SignalError(aError: String);
begin
  try
    FTP.Disconnect;
  except
  end;
  if fQueue.Closed then
    Exit;

  Synchronize(
    procedure
    begin
      frmClientMain.OnUploadError(aError);
    end);
end;

// Clear the queue and terminate the thread
procedure TUploader.Terminate;
begin
  fQueue.Close;
  inherited;
end;

function TUploader.UploadFile(aFileInfo: TFileInfo): String;
begin
  Result := 'Error';
  try
    if not FTP.Connected then
      ConnectFTP;
    FUploading := true;
    FTP.Put(aFileInfo.Path, ExtractFileName(aFileInfo.Path));     
    Result := '';
  finally
    FUploading := false;
  end;
end;

以及与上传程序交互的主线程的一部分:

代码语言:javascript
运行
复制
......
// Main form
    fUniqueID: Integer;  // This is a unique number given to each file, because there might be several with the same names(after one is uploaded and deleted)
    fUploader: TUploader;         // The uploader thread
    fFiles: TList<TFileInfo>;
    fCurrentFileName: String;     // Used to display the progress
    function IndexOfFile(aID: Integer): Integer;    //Return the index of the record inside the fFiles given the file ID
  public
    procedure OnCompleteFile(aFileInfo: TFileInfo);
    procedure OnUploadError(aError: String);
  end;

// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnUploadError(aError: String);
begin
  // show and log the error
end;

// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnCompleteFile(aFileInfo: TFileInfo);
var
  I: Integer;
begin
  I := IndexOfFile(aFileInfo.ID);
  if (I >= 0) and (I < fFiles.Count) then
  begin
    aFileInfo.Status := fsUploaded;
    aFileInfo.UploadedSize := aFileInfo.Size;
    FFiles.Items[I] := aFileInfo;
    Inc(FFilesUploaded);
    TFile.Delete(aFileInfo.Path);
    colProgressImg.UpdateCell(I);
  end;
end;

procedure TfrmClientMain.ProcessFolder;
var
  NewFiles: TStringDynArray;
  I, J: Integer;
  FileInfo: TFileInfo;
begin
    // Remove completed files from the list if it contains more than XX files
    while FFiles.Count > 1000 do
      if FFiles[0].Status = fsUploaded then
      begin
        Dec(FFilesUploaded);
        FFiles.Delete(0);
      end else
        Break;

    NewFiles := TDirectory.GetFiles(WatchFolder, '*.*',TSearchOption.soAllDirectories);
    for I := 0 to Length(NewFiles) - 1 do
    begin
          FileInfo.ID := FUniqueID;
          Inc(FUniqueID);
          FileInfo.Path := NewFiles[I];
          FileInfo.Size := GetFileSizeByName(NewFiles[I]);
          FileInfo.UploadedSize := 0;
          FileInfo.Status := fsToBeQueued;
          FFiles.Add(FileInfo);

      if (I mod 100) = 0 then
      begin
        UpdateStatusLabel;
        grFiles.RowCount := FFiles.Count;
        Application.ProcessMessages;
        if fUploader = nil then
          break;
      end;
    end;

    // Send the new files and resend failed to the uploader thread
    for I := 0 to FFiles.Count - 1 do
      if (FFiles[I].Status = fsToBeQueued) then
      begin
        if fUploader = nil then
          Break;
        FileInfo := FFiles[I];
        FileInfo.Status := fsQueued;
        FFiles[I] := FileInfo;
        SaveDebug(1, 'Add:    ' + ExtractFileName(FFiles[I].Path));
        FUploader.AddFile(FFiles[I]);
      end;
end;

procedure TfrmClientMain.tmrGUITimer(Sender: TObject);
var
  FileInfo: TFileInfo;
  I: Integer;
begin
  if (fUploader = nil) or not fUploader.Uploading then
    Exit;
  FileInfo := fUploader.GetProgress;
  I := IndexOfFile(FileInfo.ID);
  if (I >= 0) and (I < fFiles.Count) then
  begin
    fFiles.Items[I] := FileInfo;
    fCurrentFileName := ExtractFileName(FileInfo.Path);
    colProgressImg.UpdateCell(I);
  end;
end;

function TfrmClientMain.IndexOfFile(aID: Integer): Integer;
var
  I: Integer;
begin
  Result := -1;
  for I := 0 to FFiles.Count - 1 do
    if FFiles[I].ID = aID then
      Exit(I);
end;
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/13661713

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档