2017-03-07 34 views
9

我想使用Rayon的par_iter()来优化我的功能。Rayon中的每个线程的初始化

的单线程的版本是这样的:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) { 

    let result = txs.iter().map(|tx| { 

     tx.verify_and_store(store) 

    }).collect(); 

    ... 
} 

每个Store实例必须由一个线程使用,但只能中Store多个实例可以同时使用,这样我就可以使通过clone -ing这个多线程store

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) { 

    let result = txs.par_iter().map(|tx| { 

     let mut local_store = store.clone(); 

     tx.verify_and_store(&mut local_store) 

    }).collect(); 

    ... 
} 

然而,这种克隆store迭代,这是WA太慢了。我想每个线程使用一个商店实例。

人造丝这可能吗?还是应该采取手动线程和工作队列?

回答

5

它可以使用线程局部变量,以确保local_store没有在给定的线程创建不止一次。

例如,该编译(full source):然而

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) { 
    use std::cell::RefCell; 
    thread_local!(static STORE: RefCell<Option<Store>> = RefCell::new(None)); 

    let mut result = Vec::new(); 

    txs.par_iter().map(|tx| { 
     STORE.with(|cell| { 
      let mut local_store = cell.borrow_mut(); 
      if local_store.is_none() { 
       *local_store = Some(store.clone()); 
      } 
      tx.verify_and_store(local_store.as_mut().unwrap()) 
     }) 
    }).collect_into(&mut result); 
} 

有两个问题与此代码。一,如果store克隆需要做一些事情的时候par_iter()完成后,诸如冲洗自己的缓冲区,它根本不会发生 - 他们Drop才会被调用时,人造丝的工作线程退出,甚至is not guaranteed

第二个,也是更严重的问题,是的store的克隆每个工人线程创建一次。如果Rayon缓存其线程池(并且我相信它),这意味着后续与verify_and_store无关的后续调用将继续使用store的最后一个已知克隆,这可能与当前存储无关。

这可以通过代码有点复杂予以纠正:

  • 商店在Mutex<Option<...>>,而不是Option克隆变量,使他们可以通过调用par_iter()线程访问。这将导致每次访问互斥锁,但该锁将是无争议的,因此便宜。

  • 周围使用互斥的Arc为了收集到向量中创建商店克隆的引用。此向量用于在迭代完成后将商店重置为None以清理商店。

  • 将整个调用放在一个不相关的互斥体中,这样两个并行调用verify_and_store最终不会看到对方的存储体克隆。 (如果在迭代之前创建并安装了新的线程池,这可能是可以避免的。)希望这个序列化不会影响verify_and_store的性能,因为每个调用都将使用整个线程池。

结果是不是很漂亮,但它编译,仅使用安全代码,似乎工作:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) { 
    use std::sync::{Arc, Mutex}; 
    type SharedStore = Arc<Mutex<Option<Store>>>; 

    lazy_static! { 
     static ref STORE_CLONES: Mutex<Vec<SharedStore>> = Mutex::new(Vec::new()); 
     static ref NO_REENTRY: Mutex<()> = Mutex::new(()); 
    } 
    thread_local!(static STORE: SharedStore = Arc::new(Mutex::new(None))); 

    let mut result = Vec::new(); 
    let _no_reentry = NO_REENTRY.lock(); 

    txs.par_iter().map({ 
     |tx| { 
      STORE.with(|arc_mtx| { 
       let mut local_store = arc_mtx.lock().unwrap(); 
       if local_store.is_none() { 
        *local_store = Some(store.clone()); 
        STORE_CLONES.lock().unwrap().push(arc_mtx.clone()); 
       } 
       tx.verify_and_store(local_store.as_mut().unwrap()) 
      }) 
     } 
    }).collect_into(&mut result); 

    let mut store_clones = STORE_CLONES.lock().unwrap(); 
    for store in store_clones.drain(..) { 
     store.lock().unwrap().take(); 
    } 
} 
+1

岂不可惜那里似乎不是是什么范围限定在这个调用(虽然这对于一个体面的案例子集显然是有用的)。 –

+0

@ChrisEmerson是的,我对这个答案感到担忧的是,我无法想象使用安全代码来清理创建的存储(或者在一切完成时运行其他任意命令,比如将其刷新到磁盘)的方法。更糟糕的是,下一次调用'verify_and_store'将继续使用** last **已知的'Store'克隆,这可能与当前的'store'无关。 – user4815162342

+0

谢谢。这可行,但在我个人的情况下,我发现人造丝有'par_chunks'来减少克隆的数量。虽然这可能仍然会导致每个线程有多个克隆,但它没有@ user4815162342描述的范围问题。 – Tomas