2017-04-02 24 views
2

我有问题了解Akka持久性查询,尤其是方法eventsByTag,因为它不像我所期望的那样。为什么Akka Persisence Query Read Journal没有检索我的活动?

在我的主类中,我调用一个类来开始监听使用某个标签持久化的任何事件。

class CassandraJournal(implicit val system: ActorSystem) { 

def engageStreaming = { 
    val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) 
    implicit val mat = ActorMaterializer() 

    readJournal.eventsByTag("account", Offset.noOffset) 
    .runForeach { event => println(event) } 
    } 
} 

每当我开始我的服务器和我的事件存储是空的,我坚持我的第一个事件(通过调用HTTP服务,建于阿卡HTTP),该事件确实被打印出来。但是,当我重新启动服务器并且事件存储中已有事件时,新的持久事件将不会打印。

对此有解释吗?我很难弄清楚为什么会发生这种情况。

编辑

事件我使用的商店是卡桑德拉。这里是PersistentActor(我不使用事件适配器来标记事件,只是包装他们周围一个标记())

class Account(id: UUID) extends PersistentActor { 

    override def receiveRecover: Receive = { 
    case createCheckingsAccount: CreateCheckingsAccount => 
     println("Creating checkings account") 
    } 

    override def receiveCommand: Receive = { 
    case createCheckingsAccount: CreateCheckingsAccount => 
     persist(Tagged(CheckingsAccountCreated(id), Set("account"))) { event => 
     val checkingsAccountCreatedEvent = event.payload.asInstanceOf[CheckingsAccountCreated] 
     sender ! CreateCheckingsAccountResponse(checkingsAccountCreatedEvent.id.toString) 
     } 

    } 

    def updateState(evt: Event): Unit = { 
    } 

    override def persistenceId: String = s"account-$id" 
} 
+0

使用事件适配器,持久角色以及您用于持久性的数据存储的更新问题。 –

回答

2

随着receiveRecover没有做必要的状态恢复工作,坚持不懈,就不能正常工作。我建议在receiveRecover中加入一些基本的状态恢复逻辑,并让你的updateState方法覆盖标记的事件情况。

我在一个类似于下面的状态恢复逻辑的应用中使用了eventsByTag,它在新的启动和恢复方面都工作得很好。

def updateState(e: Any): Unit = e match { 
    case evt: Event => 
    state = state.updated(evt) 
    case Tagged(evt: Event, _) => 
    state = state.updated(evt) 
} 

... 

override def receiveRecover: Receive = { 
    case evt: Event => updateState(evt) 
    case taggedEvt: Tagged => updateState(taggedEvt) 
} 
+0

通过设置一些适当的恢复工作,我能够得到它的工作。我并没有意识到复苏将重新开始。谢谢你的帮助! –

相关问题