2011-12-02 70 views
4

[编辑:感谢MSalters的回答和Raymond Chen的回答InterlockedIncrement vs EnterCriticalSection/counter++/LeaveCriticalSection,问题解决了,下面的代码工作正常。这应该提供一个在Windows中使用线程池的有趣简单示例]Windows API线程池简单示例

我无法找到以下任务的简单示例。例如,我的程序需要将一个巨大的std :: vector中的值加1,所以我想要并行执行。它需要在程序的整个生命周期内进行很多次。我知道如何在每次调用例程时使用CreateThread,但是我没有设法使用ThreadPool去除CreateThread。

这里是我做的:

class Thread { 
public: 
    Thread(){} 
    virtual void run() = 0 ; // I can inherit an "IncrementVectorThread" 
}; 
class IncrementVectorThread: public Thread { 
public: 
    IncrementVectorThread(int threadID, int nbThreads, std::vector<int> &vec) : id(threadID), nb(nbThreads), myvec(vec) { }; 

    virtual void run() { 
     for (int i=(myvec.size()*id)/nb; i<(myvec.size()*(id+1))/nb; i++) 
      myvec[i]++; //and let's assume myvec is properly sized 
    } 
    int id, nb; 
    std::vector<int> &myvec; 
}; 

class ThreadGroup : public std::vector<Thread*> { 
public: 
    ThreadGroup() { 
     pool = CreateThreadpool(NULL); 
     InitializeThreadpoolEnvironment(&cbe); 
     cleanupGroup = CreateThreadpoolCleanupGroup(); 
     SetThreadpoolCallbackPool(&cbe, pool); 
     SetThreadpoolCallbackCleanupGroup(&cbe, cleanupGroup, NULL); 
     threadCount = 0; 
    } 
    ~ThreadGroup() { 
     CloseThreadpool(pool); 
} 
    PTP_POOL pool; 
    TP_CALLBACK_ENVIRON cbe; 
    PTP_CLEANUP_GROUP cleanupGroup; 
    volatile long threadCount; 
} ; 


static VOID CALLBACK runFunc(
       PTP_CALLBACK_INSTANCE Instance, 
       PVOID Context, 
       PTP_WORK Work) { 

    ThreadGroup &thread = *((ThreadGroup*) Context); 
    long id = InterlockedIncrement(&(thread.threadCount)); 
    DWORD tid = (id-1)%thread.size(); 
    thread[tid]->run(); 
} 

void run_threads(ThreadGroup* thread_group) { 
    SetThreadpoolThreadMaximum(thread_group->pool, thread_group->size()); 
    SetThreadpoolThreadMinimum(thread_group->pool, thread_group->size()); 

    TP_WORK *worker = CreateThreadpoolWork(runFunc, (void*) thread_group, &thread_group->cbe); 
    thread_group->threadCount = 0; 
    for (int i=0; i<thread_group->size(); i++) { 
     SubmitThreadpoolWork(worker); 
    } 
    WaitForThreadpoolWorkCallbacks(worker,FALSE); 
    CloseThreadpoolWork(worker); 
}  

void main() { 

    ThreadGroup group; 
    std::vector<int> vec(10000, 0); 
    for (int i=0; i<10; i++) 
     group.push_back(new IncrementVectorThread(i, 10, vec)); 

    run_threads(&group); 
    run_threads(&group); 
    run_threads(&group); 

    // now, vec should be == std::vector<int>(10000, 3);  
} 

所以,如果我深知:
- 命令CreateThreadpool创建了一堆线程(因此,调用CreateThreadpoolWork是便宜的,因为它不调用CreateThread)
- 我可以拥有尽可能多的线程池(如果我想为“IncrementVector”创建一个线程池,并为我的“DecrementVector”线程创建一个线程池)。
- 如果我需要将我的“增量向量”任务分成10个线程,而不是调用10次CreateThread,我创建一个“worker”,并使用相同的参数向ThreadPool提交10次(因此,我需要回调中的线程ID来知道std :: vector的哪个部分要增加)。在这里我找不到线程ID,因为函数GetCurrentThreadId()返回线程的真实ID(即,类似1528的东西,而不是0..nb_launched_threads之间的东西)。

最后,我不确定我是否理解这个概念:如果我将std :: vector分成10个线程,我真的需要单个worker而不是10吗?

谢谢!

回答

5

你基本上直到最后一点。

关于线程池的整个想法是,你不关心它有多少个线程。您只需将大量工作投入到线程池中,并让OS确定如何执行每个块。 因此,如果您创建并提交了10个块,操作系统可能会使用池中的1到10个线程。

你不应该关心那些线程标识。不要打扰线程ID,最小或最大线程数或类似的东西。

如果您不关心线程标识,那么您如何管理要更改的向量的哪一部分?简单。在创建线程池之前,将计数器初始化为零。在回调函数中,调用InterlockedIncrement来检索和增加计数器。对于每个提交的工作项目,您将获得一个连续的整数。

+0

谢谢 - upvoted!我做了你的建议,但是用上面的简单代码,最后我仍然没有在向量中得到适当的值:s – WhitAngl

+0

答案重新验证:计数器没有在正确的位置递增:)谢谢! – WhitAngl

+0

对不起,我未验证以重新打开该问题。上面的代码在10000次执行中约有7次未能给出只包含“3”的向量:s – WhitAngl