2012-08-10 57 views
3

我想以多线程方式实现分支和边界搜索。特别是,我想使用async在每个分支中包装搜索调用,然后等待某个线程出现答案,然后退出。 (理想情况下,我想取消其他线程,但线程取消不在标准中)。这里有一些简化代码:使用C++中的futures,异步和线程实现搜索11

#include <iostream> 
#include <random> 
#include <future> 
#include <thread> 

using namespace std; 

mt19937 rng; 
uniform_int_distribution<unsigned> random_binary(0, 1); 

bool search() { 
    return static_cast<bool>(random_binary(rng)); 
} 

#define N 10000 

int main() 
{ 
    rng.seed(42); 

    std::vector<future<bool>> tasks; 

    for (unsigned i=0; i<N; ++i) 
    tasks.push_back(async(launch::async, search)); 

    // Don't want to wait sequentially here. 
    for (unsigned i=0; i<N; ++i) { 
    tasks[i].wait(); 
    if (tasks[i].get()) { 
     cout << "i = " << i << "\n"; 
     break; 
    } 
    } 
    return 0; 
} 

search()是搜索功能。它根据是否找到答案返回true/false。为了说明,我返回一个随机答案。但问题的关键在于调用tasks[i].wait()的for循环。现在,我正在顺序地等待任务完成。相反,我想要做这样的事情:

auto x = wait_for_any(tasks.begin(), tasks.end()); 
x.get(); 
// cancel other threads. 
// Profit? 

什么是实现这一目标的好方法?

+0

顺便说一句,你在UB-土地已经来了由具有多线程调用'random_binary(RNG)',这是_not_线程安全的。 – ildjarn 2012-08-11 02:55:36

回答

8

std::future提供了一个valid()函数,可以让您检查结果是否可用而不会阻塞,因此您可以使用该函数,例如,在繁忙的等待循环:

std::future<bool>* res_future = 0; 
for(size_t i = 0; ; i==tasks.size()?i=0:++i){ 
    // could add a timeout period to not completely busy-wait the CPU 
    if(tasks[i].wait_for(std::chrono::seconds(0)) == std::future_status::ready){ 
    res = &tasks[i]; 
    break; 
    } 
} 

bool res = res_future->get(); 

拟议除了std::future,做出这样的任务更容易,是一个.then(func_obj)方法异步调用func_obj当结果是可用的,在那里你可以设置标志或东西。

我遗憾地不知道有一种方法可能以上述以外的其他方式实施wait_for_any。 :/

template<class FwdIt> 
std::future<bool> wait_for_any(FwdIt first, FwdIt last) 
{ 
    return std::async([=]{ 
    for(FwdIt cur(first); ; cur==last?cur=first:++cur){ 
    // could add a timeout period to not completely busy-wait the CPU 
    if(cur->wait_for(std::chrono::seconds(0)) == std::future_status::ready) 
     return cur->get(); 
    }); 
} 

线程破坏通常通过协作取消来完成。

体育:std::future<T>::get()将自动wait()如果结果不可用。

+0

你的代码比我的代码好得多,但它仍然按顺序同步轮询每个线程。我想知道是否有更多的异步解决方案。 – keveman 2012-08-10 22:55:41

+2

@keveman:我想你可能需要放弃'std :: future' /'std :: async',并用'std :: condition_variable'把自己放在一起。 – Xeo 2012-08-10 23:01:14

+0

@keveman:看看编辑,它现在“更加异步”,因为主线程在请求​​之前不会被阻塞。 ;) – Xeo 2012-08-10 23:20:58

6

安排所有任务访问相同的condition_variable,mutexbool。这可以通过使这些全局变量或成员数据在每个任务运行成员函数的情况下完成,或者可以通过std::ref作为参数在任务创建函数中传递它们。

在开始任何任务之前将bool初始化为not_found。主线程然后启动任务并等待condition_variable。搜索者任务然后搜索。当他们搜索时,他们偶尔会检查bool(可能是原子负载),看看它是否已设置为found。如果有,搜索器线程立即返回。

当一个线程找到结果时,它将bool设置为found并指示condition_variable。这将唤醒主线程并有效地取消其余的搜索任务。然后主线程可以加入,分离,放弃任何搜索任务。如果您没有主要显式加入搜索任务,那么最好是如果您可以安排所有搜索任务在主要退出之前结束。

没有轮询。没有等待死胡同的搜索。唯一特别的部分是弄清楚搜索者任务如何以及经常检查bool。我建议性能测试这部分。

+0

检查'bool'部件对我的用例是一种侵入性。我从单线程代码开始,但想要并行地在分支上运行搜索。我猜这在尝试并行化现有代码库时并不罕见。当然,我会咬紧牙关,做一些侵入式的修改,但是我希望我有thread :: cancel。 – keveman 2012-08-10 23:25:55

+0

@keveman:不,你没有。在没有线程合作的情况下取消线程是没有意义的。如果它锁定了一个互斥体并且因为杀死它而无法解锁它呢?这就是合作取消的目的。 – Xeo 2012-08-10 23:29:43

+0

@Xeo再次看看我的用例,我从顺序代码开始,没有互斥或任何这样的东西。我同意,thread :: cancel可能会导致死锁等,因为被取消的线程可能处于任何状态,应该小心使用。不过,我只是认为thread :: cancel对于移植某些类的顺序程序很有用。 – keveman 2012-08-10 23:43:55

0

我会采取的方法是让主函数创建一个std::promise,然后将其作为参考进行工作的各个线程。然后,每个工作线程将使用他们共享的std::promise在他们得到结果后使用set_value()

哪个工作线程首先到达那里将能够发送结果,其他人将在尝试使用set_value()时得到异常,并且它已被设置。

在下面的源代码框架中,我使用的是std::async,但如果我们可以在不创建额外的std::promise而不用任何东西的情况下剥离自治线程,那将会很好。

因此代码框架看起来是这样的:

#include <iostream> 
#include <thread> // std::thread is defined here 
#include <future> // std::future and std::promise defined here 

bool search(int args, std::promise<int> &p) { 

    // do whatever needs to be done 

    try { 
     p.set_value(args); 
    } 
    catch (std::future_error & e) { 
     // std::future_errc::promise_already_satisfied 
    } 
    return true; 
} 
int main(int argc, char * argv[]) 
{ 
    std::promise<int> myProm;  
    auto fut = myProm.get_future(); 

    for (int i = 1; i < 4; i++) { 
     std::async(std::launch::async, search, i, std::ref(myProm)); 
    } 

    auto retVal = fut.get(); 
    return 0; 
}