我有问题了解Akka持久性查询,尤其是方法eventsByTag,因为它不像我所期望的那样。为什么Akka Persisence Query Read Journal没有检索我的活动?
在我的主类中,我调用一个类来开始监听使用某个标签持久化的任何事件。
class CassandraJournal(implicit val system: ActorSystem) {
def engageStreaming = {
val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
implicit val mat = ActorMaterializer()
readJournal.eventsByTag("account", Offset.noOffset)
.runForeach { event => println(event) }
}
}
每当我开始我的服务器和我的事件存储是空的,我坚持我的第一个事件(通过调用HTTP服务,建于阿卡HTTP),该事件确实被打印出来。但是,当我重新启动服务器并且事件存储中已有事件时,新的持久事件将不会打印。
对此有解释吗?我很难弄清楚为什么会发生这种情况。
编辑
事件我使用的商店是卡桑德拉。这里是PersistentActor(我不使用事件适配器来标记事件,只是包装他们周围一个标记())
class Account(id: UUID) extends PersistentActor {
override def receiveRecover: Receive = {
case createCheckingsAccount: CreateCheckingsAccount =>
println("Creating checkings account")
}
override def receiveCommand: Receive = {
case createCheckingsAccount: CreateCheckingsAccount =>
persist(Tagged(CheckingsAccountCreated(id), Set("account"))) { event =>
val checkingsAccountCreatedEvent = event.payload.asInstanceOf[CheckingsAccountCreated]
sender ! CreateCheckingsAccountResponse(checkingsAccountCreatedEvent.id.toString)
}
}
def updateState(evt: Event): Unit = {
}
override def persistenceId: String = s"account-$id"
}
使用事件适配器,持久角色以及您用于持久性的数据存储的更新问题。 –