2011-04-23 74 views
4

我正在写一段代码,当缓冲区(列表)增长到一定的大小时,它将填充mongoDB集合。如何避免使用Scala的Actor时的竞争条件

import scala.actors.Actor 
import com.mongodb.casbah.Imports._ 
import scala.collection.mutable.ListBuffer 

class PopulateDB extends Actor { 
    val buffer = new ListBuffer[DBObject] 
    val mongoConn = MongoConnection() 
    val mongoCol = mongoConn("casbah_test")("logs") 

    def add(info: DBObject = null) { 
    if (info != null) buffer += info 

    if (buffer.size > 0 && (info == null || buffer.length >= 1000)) { 
     mongoCol.insert(buffer.toList) 
     buffer.clear 
     println("adding a batch") 
    } 
    } 

    def act() { 
    loop { 
     react { 
     case info: DBObject => add(info) 

     case msg if msg == "closeConnection" => 
      println("Close connection") 
      add() 
      mongoConn.close 
     } 
    } 
    } 
} 

然而,当我运行下面的代码,斯卡拉偶尔会扔在“mongoCol.insert(buffer.toList)”行“ConcurrentModificationException的”。我很确定它与“mongoCol.insert”有关。我想知道代码中是否存在任何根本性错误。或者我应该使用类似Akka的“atomic {...}”来避免这个问题。

下面是完整的堆栈跟踪:

[email protected]: caught java.util.ConcurrentModificationException 
java.util.ConcurrentModificationException 
    at java.util.LinkedHashMap$LinkedHashIterator.nextEntry(LinkedHashMap.java:373) 
    at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:392) 
    at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:391) 
    at org.bson.BSONEncoder.putObject(BSONEncoder.java:113) 
    at org.bson.BSONEncoder.putObject(BSONEncoder.java:67) 
    at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:215) 
    at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:180) 
    at com.mongodb.DBCollection.insert(DBCollection.java:85) 
    at com.mongodb.casbah.MongoCollectionBase$class.insert(MongoCollection.scala:561) 
    at com.mongodb.casbah.MongoCollection.insert(MongoCollection.scala:864) 
    at PopulateDB.add(PopulateDB.scala:14) 
    at PopulateDB$$anonfun$act$1$$anonfun$apply$1.apply(PopulateDB.scala:26) 
    at PopulateDB$$anonfun$act$1$$anonfun$apply$1.apply(PopulateDB.scala:25) 
    at scala.actors.ReactorTask.run(ReactorTask.scala:34) 
    at scala.actors.Reactor$class.resumeReceiver(Reactor.scala:129) 
    at PopulateDB.scala$actors$ReplyReactor$$super$resumeReceiver(PopulateDB.scala:5) 
    at scala.actors.ReplyReactor$class.resumeReceiver(ReplyReactor.scala:69) 
    at PopulateDB.resumeReceiver(PopulateDB.scala:5) 
    at scala.actors.Actor$class.searchMailbox(Actor.scala:478) 
    at PopulateDB.searchMailbox(PopulateDB.scala:5) 
    at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114) 
    at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114) 
    at scala.actors.ReactorTask.run(ReactorTask.scala:36) 
    at scala.concurrent.forkjoin.ForkJoinPool$AdaptedRunnable.exec(ForkJoinPool.java:611) 
    at scala.concurrent.forkjoin.ForkJoinTask.quietlyExec(ForkJoinTask.java:422) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.mainLoop(ForkJoinWorkerThread.java:340) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:325) 

感谢, 德里克

+0

你可以发布stacktrace吗?什么组件引发并发修改异常? – 2011-04-23 07:09:38

+0

谢谢。我用堆栈跟踪更新了这个问题。 – defoo 2011-04-23 07:27:14

+0

“缓冲区”和“添加”是公开的吗?我会让他们为私人/保护开始,以确保他们不会在别处被意外调用(通过非主角线程)。 – 2011-04-23 08:44:54

回答

4

DBObject是不是线程安全;你正在发送一个DBObject和你的actor消息。它可能会在稍后再次被修改,这将导致并发修改问题。

我会建议先尝试在DBObject上使用clone(),因为它会进入actor,并将其放入缓冲区。它只是一个浅拷贝,但至少应该足以导致LinkedHashMap上的并发修改问题,这些问题支持DBObject上的键(依靠LHM保持排序)。

我想尝试:

def act() { 
    loop { 
     react { 
     case info: DBObject => add(info.clone()) 

     case msg if msg == "closeConnection" => 
      println("Close connection") 
      add() 
      mongoConn.close 
     } 
    } 
    } 

如果不行,看看其他地方要修改的DBOBJECT它发送到演员后。

+0

好像克隆不适用于DBObject,这是令人惊讶的,因为我认为克隆是从AnyRef继承的。我最终使用了Synchronized {}。 – defoo 2011-04-25 01:00:58

+0

它在DBObject上不可用,但它在BasicDBObject上可用。除非您创建了自己的DBObject版本,否则BasicDBObject是Java驱动程序(由Casbah使用)提供的具体实现,并且可以使用clone()。 – 2011-04-25 01:04:41

1

为什么下面class

class PopulateDB extends Actor 

你保留多个PupulateDB演员?我期望object PopulateDB extends Actor,这样一个演员就可以集中这个任务。

除此之外,问题似乎是casbah或mongodb本身。

+0

问题似乎出现在Java驱动程序(哪个Casbah包装)内。它看起来像需要同步的东西,并不是......虽然我不知道斯卡拉演员以及阿卡的 – 2011-04-24 20:32:51

+0

@布伦丹使vals'私人',以确保他们没有在其他地方使用,但似乎两次调用'mongoCol.insert'会导致问题。除了使'PopulateDB'成为一个单身人士以外,没有什么演员能够做到这一点,以确保同时不能有多个电话。 – 2011-04-24 20:43:58

+0

@Brendan哎呀,对不起。我以为你是德里克。 – 2011-04-24 20:46:58