2015-05-13 40 views
1

我需要为无锁定堆栈编写void push(const T& val)实现。 问题是,compare_exchange_weak期望非原子node*,但我必须使用std::atomic<node*> next字段而不是常规node* next。 我试图这样做C++如何锁定自由堆栈推入原子

void push(const T& val) { 
    node* new_node = new node(val); 
    node* local_next = new_node->next.load(); 
    while (!head.compare_exchange_weak(local_next, new_node)); 
} 

但如果创造让local_next事情更糟,解决这个问题。我测试了两种代码。第一个有非原子字段node* next,我在下面的测试代码中丢失了大约20-30个元素。而使用第二个变体,我陷入了僵局。

测试代码:

#include <iostream> 
#include <thread> 
#include <atomic> 
#include "lock_free_stack.h" 

using namespace std; 

void test(lock_free_stack<int>& st, atomic<int>& sum) { 
    st.push(1); 
    shared_ptr<int> val(st.pop()); 
    while (val == nullptr) { } 
    sum.store(sum.load() + *val); 
} 

int main(int argc, const char * argv[]) { 
    atomic<int> sum; 
    sum.store(0); 
    for (int i = 0; i < 100; ++i) { 
     lock_free_stack<int> st; 
     thread t1(test, ref(st), ref(sum)); 
     thread t2(test, ref(st), ref(sum)); 
     thread t3(test, ref(st), ref(sum)); 
     thread t4(test, ref(st), ref(sum)); 
     thread t5(test, ref(st), ref(sum)); 
     thread t6(test, ref(st), ref(sum)); 
     thread t7(test, ref(st), ref(sum)); 
     thread t8(test, ref(st), ref(sum)); 
     t1.join(); 
     t2.join(); 
     t3.join(); 
     t4.join(); 
     t5.join(); 
     t6.join(); 
     t7.join(); 
     t8.join(); 
    } 
    if (sum.load() == 800) { 
     cout << "CORRECT" << endl; 
    } else { 
     cout << "TIME TO REWRITE STACK " << sum << endl; 
    } 
    return 0; 
} 

而且我的无锁堆栈的代码(第一变体):

#ifndef lock_free_stack_hard_lock_free_stack_h 
#define lock_free_stack_hard_lock_free_stack_h 

template <typename T> 
class lock_free_stack { 
private: 
    struct node { 
     node* next; 
     std::shared_ptr<T> value; 
     node (const T& val) : value(std::make_shared<T>(val)) { } 
    }; 
    std::atomic<node*> head; 
    std::shared_ptr<T> default_value; 

public: 
    lock_free_stack() : head(nullptr), default_value(std::make_shared<T>()) { } 

    void push(const T& val) { 
     node* new_node = new node(val); 
     new_node->next = head.load(); 
     while (!head.compare_exchange_weak(new_node->next, new_node)); 
    } 

    std::shared_ptr<T> pop() { 
     node* old_head = head.load(); 
     while (old_head && !head.compare_exchange_weak(old_head, old_head->next)); 
     if (old_head) { 
      return old_head->value; 
     } else { 
      return std::shared_ptr<T>(); 
     } 
    } 
}; 

#endif 

而第二个变型:

#ifndef lock_free_stack_hard_lock_free_stack_h 
    #define lock_free_stack_hard_lock_free_stack_h 

    template <typename T> 
    class lock_free_stack { 
    private: 
     struct node { 
      std::atomic<node*> next; 
      std::shared_ptr<T> value; 
      node (const T& val) : value(std::make_shared<T>(val)) { } 
     }; 
     std::atomic<node*> head; 
     std::shared_ptr<T> default_value; 

    public: 
     lock_free_stack() : head(nullptr), default_value(std::make_shared<T>()) { } 

     void push(const T& val) { 
      node* new_node = new node(val); 
      new_node->next = head.load(); 
      node* local_next = new_node->next.load(); 
      while (!head.compare_exchange_weak(local_next, new_node)); 
     } 

     std::shared_ptr<T> pop() { 
      node* old_head = head.load(); 
      while (old_head && !head.compare_exchange_weak(old_head, old_head->next)); 
      if (old_head) { 
       return old_head->value; 
      } else { 
       return std::shared_ptr<T>(); 
      } 
     } 
    }; 

    #endif 

所以最终的问题是如何正确创建local_next

谢谢。与测试

+0

['compare_exchange_weak'](http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange)的例子实际上实现了一个无锁'stack :: push'。 – Barry

+0

@Barry,字段'node * next'实际上应该看起来像'std :: atomic next' – Maksim

+0

你可能会碰到一个编译器错误:[msvc](https://connect.microsoft.com/VisualStudio/feedback/details/819819/std-atomic-compare-exchange-weak-has-spurious-write-which-cause-race-conditions),[clang](http://lists.cs.uiuc.edu/pipermail/llvmbugs/ 2014-February/032833.html)和[gcc](https://gcc.gnu.org/bugzilla/show_bug.cgi?id=60272)在编译compare_exchange_ *时都遇到了问题。请参阅cppreference.com上的[示例](http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange)。 – programmerjake

回答

1

的一个问题是线sum.store(sum.load() + *val);

使用原子OPS,如sum += *val;

+0

谢谢,但是,如何用'std :: atomic '接下来实现这个呢? – Maksim

0

第一个变种是垃圾,因为你不能保证商店原子节点::下一个指针。有可能使用带有非原子下一级指针的内存栅栏/栅栏,但我不相信这样的实现。 你的第二个变体更接近正确的实现。 但是你忘了从推()CAS循环最重要的事情:

void push(const T& val) { 
    node* new_node = new node(val); 
    new_node->next = head.load(); 
    node* local_next = new_node->next.load(); 
    while (!head.compare_exchange_weak(local_next, new_node)); 
} 

下面的代码分配新的节点,加载和存储head指针new_node->next。接下来,代码将已知栈head的指针值保存为local_next。 (不必要的步骤)然后代码尝试更新堆栈headnew_node而不更新new_node->next。如果单核线程运行单线程而没有抢占,CAS就会成功100%,这将会很好。 ;)

当CAS失败时,它加载的head当前新鲜参数为local_next和代码滞留在无限循环,因为new_node绝不等于local_next。所以你得到了最后一部分错误。

要使功能CAS循环失败,线程必须重新加载并重新计算它试图更新的数据。这意味着您在重新尝试CAS之前必须从head更新new_node->next。 这并不能解决CAS循环的ABA问题,但是我将其从我的答案中排除。我建议阅读更多关于CAS手术及其缺陷的文章。

由于CAS操作执行加载操作,修复非常简单,只需在CAS失败后存储local_nextnew_node->next即可。 这是更有效的(未经测试)版本:

node* new_node = new node(val); 
node* local_head = head.load(); 
new_node->next.store(local_head); 
while(!head.compare_exchange_weak(local_head, new_node) { 
    new_node->next.store(local_head); 
} 

你会做similiar事情你pop()方法的实现。