2012-12-01 42 views
18

目前我正在使用Delphi XE3客户机/服务器应用程序传输文件(使用Indy FTP组件)。客户端部分监视文件夹,获取文件列表,将其上传到服务器并删除原件。上传由一个单独的线程完成,该线程逐个处理文件。这些文件的范围可以从0到几千,它们的大小也会有很大的差异。多线程文件上传同步

这是一个为O​​SX和Windows编译的Firemonkey应用程序,所以我不得不使用TThread而不是OmniThreadLibrary,我首选。我的客户报告应用程序随机冻结。我不能复制它,但由于我对TThread没有太多的经验,我可能会在某处放置死锁状态。我阅读了很多例子,但我仍然不确定一些多线程细节。

的应用程序结构很简单:
在主线程计时器检查的文件夹,并得到有关每个文件到一个记录,这将进入一个通用信息的TList。此列表保留有关文件名称,大小,进度,文件是否已完全上传或必须重试的信息。所有显示在带有进度条的网格中等。此列表仅由主线程访问。 之后,通过调用AddFile方法(下面的代码)将列表中的项目发送到线程。线程将所有文件存储在如下所示的线程安全队列中http://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/
上传文件时,上传器线程会通过调用Synchronize来通知主线程。
主线程定期调用Uploader.GetProgress方法来检查当前文件的进度并显示它。这个函数实际上并不是线程安全的,但是会导致死锁,还是只返回错误的数据?

什么是安全有效的方法来进行进度检查?

那么这种方法好还是我错过了什么?你会如何做到这一点?
例如我虽然只是为了读取文件夹内容而创建一个新线程。这意味着我使用的TList必须是线程安全的,但必须始终访问TList以刷新GUI网格中显示的信息。不是所有的同步只会减慢GUI?

我已经发布了下面的简化代码以防万一有人想看它。如果没有,我会很乐意听到一些关于我应该使用的一般意见。主要目标是在OSX和Windows上工作;能够显示有关所有文件和当前进度的信息;并且无论文件的数量和大小如何都会有所反应。

这是上传器线程的代码。我已删除了部分的它更容易阅读:

type 
    TFileStatus = (fsToBeQueued, fsUploaded, fsQueued); 
    TFileInfo = record 
    ID: Integer; 
    Path: String; 
    Size: Int64; 
    UploadedSize: Int64; 
    Status: TFileStatus; 
    end; 

    TUploader = class(TThread) 
    private 
    FTP: TIdFTP; 
    fQueue: TThreadedQueue<TFileInfo>; 
    fCurrentFile: TFileInfo; 
    FUploading: Boolean; 
    procedure ConnectFTP; 
    function UploadFile(aFileInfo: TFileInfo): String; 
    procedure OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64); 
    procedure SignalComplete; 
    procedure SignalError(aError: String); 
    protected 
    procedure Execute; override; 
    public 
    property Uploading: Boolean read FUploading; 
    constructor Create; 
    destructor Destroy; override; 
    procedure Terminate; 
    procedure AddFile(const aFileInfo: TFileInfo); 
    function GetProgress: TFileInfo; 
    end; 

procedure TUploader.AddFile(const aFileInfo: TFileInfo); 
begin 
    fQueue.Enqueue(aFileInfo); 
end; 

procedure TUploader.ConnectFTP; 
begin 
    ... 
    FTP.Connect; 
end; 

constructor TUploader.Create; 
begin 
    inherited Create(false); 
    FreeOnTerminate := false; 
    fQueue := TThreadedQueue<TFileInfo>.Create; 
    // Create the TIdFTP and set ports and other params 
    ... 
end; 

destructor TUploader.Destroy; 
begin 
    fQueue.Close; 
    fQueue.Free; 
    FTP.Free; 
    inherited; 
end; 

// Process the whole queue and inform the main thread of the progress 
procedure TUploader.Execute; 
var 
    Temp: TFileInfo; 
begin 
    try 
    ConnectFTP; 
    except 
    on E: Exception do 
     SignalError(E.Message); 
    end; 

    // Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails 
    while fQueue.Peek(fCurrentFile) = wrSignaled do 
    try 
     if UploadFile(fCurrentFile) = '' then 
     begin 
     fQueue.Dequeue(Temp); // Delete the item from the queue if succesful 
     SignalComplete; 
     end; 
    except 
     on E: Exception do 
     SignalError(E.Message); 
    end; 
end; 

// Return the current file's info to the main thread. Used to update the progress indicators 
function TUploader.GetProgress: TFileInfo; 
begin 
    Result := fCurrentFile; 
end; 

// Update the uploaded size for the current file. This information is retrieved by a timer from the main thread to update the progress bar 
procedure TUploader.OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64); 
begin 
    fCurrentFile.UploadedSize := AWorkCount; 
end; 

procedure TUploader.SignalComplete; 
begin 
    Synchronize(
    procedure 
    begin 
     frmClientMain.OnCompleteFile(fCurrentFile); 
    end); 
end; 

procedure TUploader.SignalError(aError: String); 
begin 
    try 
    FTP.Disconnect; 
    except 
    end; 
    if fQueue.Closed then 
    Exit; 

    Synchronize(
    procedure 
    begin 
     frmClientMain.OnUploadError(aError); 
    end); 
end; 

// Clear the queue and terminate the thread 
procedure TUploader.Terminate; 
begin 
    fQueue.Close; 
    inherited; 
end; 

function TUploader.UploadFile(aFileInfo: TFileInfo): String; 
begin 
    Result := 'Error'; 
    try 
    if not FTP.Connected then 
     ConnectFTP; 
    FUploading := true; 
    FTP.Put(aFileInfo.Path, ExtractFileName(aFileInfo.Path));  
    Result := ''; 
    finally 
    FUploading := false; 
    end; 
end; 

和零件与上传交互的主线程:

...... 
// Main form 
    fUniqueID: Integer; // This is a unique number given to each file, because there might be several with the same names(after one is uploaded and deleted) 
    fUploader: TUploader;   // The uploader thread 
    fFiles: TList<TFileInfo>; 
    fCurrentFileName: String;  // Used to display the progress 
    function IndexOfFile(aID: Integer): Integer; //Return the index of the record inside the fFiles given the file ID 
    public 
    procedure OnCompleteFile(aFileInfo: TFileInfo); 
    procedure OnUploadError(aError: String); 
    end; 

// This is called by the uploader with Synchronize 
procedure TfrmClientMain.OnUploadError(aError: String); 
begin 
    // show and log the error 
end; 

// This is called by the uploader with Synchronize 
procedure TfrmClientMain.OnCompleteFile(aFileInfo: TFileInfo); 
var 
    I: Integer; 
begin 
    I := IndexOfFile(aFileInfo.ID); 
    if (I >= 0) and (I < fFiles.Count) then 
    begin 
    aFileInfo.Status := fsUploaded; 
    aFileInfo.UploadedSize := aFileInfo.Size; 
    FFiles.Items[I] := aFileInfo; 
    Inc(FFilesUploaded); 
    TFile.Delete(aFileInfo.Path); 
    colProgressImg.UpdateCell(I); 
    end; 
end; 

procedure TfrmClientMain.ProcessFolder; 
var 
    NewFiles: TStringDynArray; 
    I, J: Integer; 
    FileInfo: TFileInfo; 
begin 
    // Remove completed files from the list if it contains more than XX files 
    while FFiles.Count > 1000 do 
     if FFiles[0].Status = fsUploaded then 
     begin 
     Dec(FFilesUploaded); 
     FFiles.Delete(0); 
     end else 
     Break; 

    NewFiles := TDirectory.GetFiles(WatchFolder, '*.*',TSearchOption.soAllDirectories); 
    for I := 0 to Length(NewFiles) - 1 do 
    begin 
      FileInfo.ID := FUniqueID; 
      Inc(FUniqueID); 
      FileInfo.Path := NewFiles[I]; 
      FileInfo.Size := GetFileSizeByName(NewFiles[I]); 
      FileInfo.UploadedSize := 0; 
      FileInfo.Status := fsToBeQueued; 
      FFiles.Add(FileInfo); 

     if (I mod 100) = 0 then 
     begin 
     UpdateStatusLabel; 
     grFiles.RowCount := FFiles.Count; 
     Application.ProcessMessages; 
     if fUploader = nil then 
      break; 
     end; 
    end; 

    // Send the new files and resend failed to the uploader thread 
    for I := 0 to FFiles.Count - 1 do 
     if (FFiles[I].Status = fsToBeQueued) then 
     begin 
     if fUploader = nil then 
      Break; 
     FileInfo := FFiles[I]; 
     FileInfo.Status := fsQueued; 
     FFiles[I] := FileInfo; 
     SaveDebug(1, 'Add: ' + ExtractFileName(FFiles[I].Path)); 
     FUploader.AddFile(FFiles[I]); 
     end; 
end; 

procedure TfrmClientMain.tmrGUITimer(Sender: TObject); 
var 
    FileInfo: TFileInfo; 
    I: Integer; 
begin 
    if (fUploader = nil) or not fUploader.Uploading then 
    Exit; 
    FileInfo := fUploader.GetProgress; 
    I := IndexOfFile(FileInfo.ID); 
    if (I >= 0) and (I < fFiles.Count) then 
    begin 
    fFiles.Items[I] := FileInfo; 
    fCurrentFileName := ExtractFileName(FileInfo.Path); 
    colProgressImg.UpdateCell(I); 
    end; 
end; 

function TfrmClientMain.IndexOfFile(aID: Integer): Integer; 
var 
    I: Integer; 
begin 
    Result := -1; 
    for I := 0 to FFiles.Count - 1 do 
    if FFiles[I].ID = aID then 
     Exit(I); 
end; 
+0

我不确定,但没有测试..但你尝试添加TIdAntiFreeze,并检查行为是否相同? (FMX.IdAntiFreeze) – Whiler

+2

TIdAntiFreeze设计用于防止在主线程中使用Indy组件(例如,放在表单上)时冻结GUI。我在一个单独的线程中使用它,所以我不明白它会有什么帮助。至少据我所知... – VGeorgiev

+0

第一次看,你的错误处理看起来不对我。例如,在Execute方法中,如果ConnectFTP调用失败,则会异常(在通知有关错误之后),并且仍然发出对UploadFile的调用。恕我直言,你必须清理它,并让线程在FatalException中死掉,或者正确处理Execute方法中的异常,例如重试连接多次,这可能取决于错误的类型。另一方面,如果你在主线程中有一个列表,我看不出为什么你需要在单个线程中有一个队列。 – jachguate

回答

0

这可能不是问题,但TFileInfo是创纪录的。

这意味着,作为(非const/var)参数传递时,它将被复制。这可能导致记录中的字符串等问题出现问题,这些问题在复制记录时没有更新引用计数。

有一件事要尝试将它作为一个类并传递一个实例作为参数(即指向堆上的数据的指针)。

需要注意的是在线程化32位系统上共享Int64的内容(例如,您的大小值)。

更新/读取这些不是原子地完成的&您没有任何特定的保护,所以有可能由于线程的原因读取值不匹配的高32位和低32位。 (例如,读取高位32位,写入高位32位,写入低位32位,读取低位32位,读取&写入不同的线程)。这可能不会导致您所看到的问题,除非您正在处理大于4GB的文件传输,不会导致您遇到任何问题。

0

死锁肯定很难发现,但这可能是问题所在。 在你的代码中,我没有看到你给enqueue,peek或dequeue添加了任何超时 - 这意味着它将采用默认的Infinite。

排队有这条线 - 就像任何同步对象一样,它会阻塞,直到输入完成(它锁定显示器)或发生超时(因为你没有超时,它会等待永远)

TSimpleThreadedQueue.Enqueue(const Item: T; Timeout: LongWord): TWaitResult; 
...  
if not TMonitor.Enter(FQueue, Timeout) 

我也打算让你实现自己PEEK根据出列的假设 - 只有你没有真正删除该项目。

这似乎是实现自己的超时 - 但是,你仍然有以下几点:

function TSimpleThreadedQueue.Peek/Dequeue(var Item: T; Timeout: LongWord): TWaitResult; 
... 
if not TMonitor.Enter(FQueue, Timeout) 

如果超时是无限的 - 所以,如果你是在偷看方法等待它具有无限发出信号超时,那么你不能阻塞第二个线程中的某些东西,而不会阻止该线程在无限超时等待peek方法完成。

以下是评论的从TMonitor

Enter locks the monitor object with an optional timeout (in ms) value. 
Enter without a timeout will wait until the lock is obtained. 
If the procedure returns it can be assumed that the lock was acquired. 
Enter with a timeout will return a boolean status indicating whether or 
not the lock was obtained (True) or the attempt timed out prior to 
acquire the lock (False). Calling Enter with an INFINITE timeout 
is the same as calling Enter without a timeout. 

片断由于实现使用无限默认情况下,不提供TMonitor.Spinlock值,这将阻止线程,直到它可以获取FQueue对象。

我的建议是改变你的代码如下:

// Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails 
    while true do 
    case fQueue.Peek(fCurrentFile,10) 
     wrSignaled: 
     try 
      if UploadFile(fCurrentFile) = '' then 
      begin 
      fQueue.Dequeue(Temp); // Delete the item from the queue if succesful 
      SignalComplete; 
      end; 
     except 
      on E: Exception do 
      SignalError(E.Message); 
     end; 
     wrTimeout: sleep(10); 
     wrIOCompletion, 
     wrAbandoned, 
     wrError: break; 
    end; //case 

这样,偷看不会无限期地持有FQueue锁,留下一个窗口入队获得它,并从添加文件主(UI)线程。

+0

感谢您的详细解答。我同意这两个TMonitor.Enter()行可能导致死锁。 TSimpleThreadedQueue.Peek/Dequeue中的TMonitor.Enter()后跟一个TMonitor.Wait()。如果我理解正确,Wait会暂时释放锁,并允许另一个线程在Enqueue方法中放置一个锁,因此它不会导致死锁。等待然后尝试再次放置一个锁。我很少发生死锁,而如果是这样的话,几乎每次都会发生这种情况,因为线程在队列中有任何数据之前就会启动。 – VGeorgiev

+0

Hummm ..查看TMonitor.Enter的源代码,如果您没有设置SpinCount,我不认为是这种情况 - SpinCount = 0时,大部分代码都会跳过SpinCount = 0最终你会得到这个行:结果:= MonitorSupport.WaitOrSignalObject(nil,GetEvent,Timeout)= WAIT_OBJECT_0; – SilverKnight

+0

我相信是这样 - 然而,从我的阅读和试​​图了解显示器的作用,它旋转了一段特定的时间(这应该是非常短的) - 当它变得更长时,那么你有可能的僵局 - 看看这篇关于SpinLock的Wiki文章 - http://en.wikipedia.org/wiki/Spinlock – SilverKnight

0

这可能是一个长镜头,但这是另一种可能性[前面的答案可能更有可能](我刚才碰到的,但之前已知的):使用Synchronize可能导致死锁。下面是一个关于为什么发生这种情况博客: Delphi-Workaround-for-TThread-SynchronizeWaitFor-.aspx

从文章的相关点:

线程A调用同步(治法)

线程B调用同步(方法b)

然后,在主线程的上下文中:

主线程在处理消息时调用CheckSynchronize()

CheckSynchronize用于批处理所有正在等待的呼叫(*)。因此,它会逐个拾取等待呼叫(包含MethodA和MethodB)的 队列并循环 。

MethodA在主线程的 上下文中执行。假设治法要求ThreadB.WaitFor

WaitFor的要求 CheckSynchronize时处理任何等待调用同步

从理论上讲,那么这应该处理ThreadB的同步(方法b), 允许线程B来完成。然而,MethodB已经拥有第一个CheckSynchronize调用,所以它永远不会被调用 。

DEADLOCK!

Embarcadero QC article更详细地描述了该问题。

尽管在上面的代码中看不到任何ProcessMessages调用,或者在这种情况下,在Synchronize过程中会调用WaitFor,但在调用同步的时候,可能仍然存在另一个线程也调用同步 - 但主线程已经同步并被阻塞。

这并没有点击我,因为我倾向于避免像鼠疫这样的同步调用,并且通常使用消息传递等其他方法设计UI更新,并使用消息通知而不是同步调用进行线程安全列表。

+0

再次感谢您了解这方面的细节。对于迟到的回复感到抱歉,我现在正在旅行...您在这里描述的内容也出现在我身上,我认为Synchronize是问题所在。我使用它,因为在OSX上没有SendMessage/PostMessage,或者至少我不知道是否有其他选择。所以Synchronize在当时是一个简单的解决方案。前一段时间我重写了很多代码,我不再冻结,但我不知道问题出在哪里。可能与我使用的Indy TCP组件有关,因为它们在OSX上不是很稳定...... – VGeorgiev

+0

没有问题。我正在寻找其他的东西,发现这个帖子没有答案。我用它作为学习练习来了解监视器类将做什么(我从来没有使用它)。我总是对不同的技术感兴趣,这些技术可以改进我的线程代码(主要是降低CPU使用率,但也用不同的通信方法)。这是一堂有趣的课程,希望别人也能从讨论中受益。 – SilverKnight