2017-07-12 69 views
2

考虑一个阻塞函数:this_thread :: sleep_for(milliseconds(3000));RXCPP:阻塞函数超时

我试图获得以下行为:

Trigger Blocking Function    

|---------------------------------------------X 

我想要触发闭锁功能,如果时间过长(超过两秒钟),它应该超时。

我已经做了以下内容:

my_connection = observable<>::create<int>([](subscriber<int> s) { 
    auto s2 = observable<>::just(1, observe_on_new_thread()) | 
    subscribe<int>([&](auto x) { 
     this_thread::sleep_for(milliseconds(3000)); 
     s.on_next(1); 
    }); 
}) | 
timeout(seconds(2), observe_on_new_thread()); 

我不能得到这个工作。对于初学者来说,我认为s不能从另一个线索on_next。

所以我的问题是,这是做什么正确的反应方式?如何在rxcpp中封装一个阻塞函数并为其添加超时?

随后,我想表现得像这样的一个RX流:

Trigger    Cleanup 

|------------------------X 
          (Delay) Trigger   Cleanup 
             |-----------------X 
+0

我认为这可以做的另一种方式是: 'auto connection = timeout.amb(blocking_function_observable)' – jc211

回答

0

大问题!以上非常接近。

以下是如何将阻塞操作调整为rxcpp的示例。它做libcurl polling发出http请求。

下面应该做你想要的。

auto sharedThreads = observe_on_event_loop(); 

auto my_connection = observable<>::create<int>([](subscriber<int> s) { 
     this_thread::sleep_for(milliseconds(3000)); 
     s.on_next(1); 
     s.on_completed(); 
    }) | 
    subscribe_on(observe_on_new_thread()) | 
    //start_with(0) | // workaround bug in timeout 
    timeout(seconds(2), sharedThreads); 
    //skip(1); // workaround bug in timeout 

my_connection.as_blocking().subscribe(
    [](int){}, 
    [](exception_ptr ep){cout << "timed out" << endl;} 
); 
  • subscribe_on将在一个专用的线程运行create,因此create允许阻止线程。
  • timeout将在不同的线程上运行定时器,可以与其他线程共享定时器,并将所有调用转移到同一个线程。
  • as_blocking将确保subscribe直到它完成才会返回。这仅用于防止main()退出 - 通常在测试或示例程序中。

编辑:在timeout中增加了解决方法。目前,它不会安排第一次超时,直到第一个值到达。

编辑-2:timeout错误已修复,解决方法不再需要。

+0

嗨柯克,感谢您对此的意见。我运行了你建议的代码片段。看起来'on_next'在'on_error'之前触发。我怀疑是'this_thread :: sleep_for()'阻止了启动内部定时器的超时。如果's.on_completed'被移除,'on_next'从超时后触发'on_error'。 – jc211

+0

这就是我没有尝试这第一次得到的。我在超时运算符中发现了一个错误,我已经用错误的解决方法更新了答案。 –

+0

测试了解决方法及其工作。谢谢! – jc211