2017-03-18 23 views
-1

我是新的Akka,我试图触发或派遣事件消息到Akka演员,我有3个事件消息一个再见一个我触发但为什么只有一个第一个事件正在被触发。Akka事件消息不派遣或触发另一个事件。总是被触发的第一个事件其他事件没有得到调度

这可能是因为:receive(receiveEvent);此方法调用我的EventProcessActor构造函数。

但之后我们也打电话给其他事件,但有些我在这里失踪为什么它不派遣到其他比赛事件。

我总是得到以下输出控制台:

[INFO] [03/18/2017 13:35:53.446]... We received the Events need to process it 

我的预期成果是:

[INFO] [03/18/2017 13:35:53.446] ... We received the Events need to process it 

[INFO] [03/18/2017 13:35:53.447]... We are processing Events 

[INFO] [03/18/2017 13:35:53.446]... Completed Events processing 

上面控制台输出我已删除[default-akka.actor.default-dispatcher-4] [akka://default/user/EventProcessing]...

我作为触发事件如下所示:

procsssEvents.tell(new EventProcessActor.EventActivity(Events.STSRT, Paths.get("/")), procsssEvents); 
procsssEvents.tell(new EventProcessActor.EventActivity(Events.READING_LINE, Paths.get("/")), procsssEvents); 
procsssEvents.tell(new ventProcessActor.EventActivity(Events.END_OR,Paths.get("/")), procsssEvents); 

下面是我的Acotr类和消息类和pom.xml文件。

AkkaActor:

package com.ebc.biz.akka.event.trigger; 

import java.io.IOException; 
import java.nio.file.Path; 
import java.nio.file.Paths; 

import scala.PartialFunction; 
import scala.runtime.BoxedUnit; 

import akka.actor.AbstractLoggingActor; 
import akka.actor.ActorRef; 
import akka.actor.ActorSystem; 
import akka.actor.ActorSystemImpl; 
import akka.actor.Props; 
import akka.japi.pf.ReceiveBuilder; 

import static com.ebc.biz.akka.event.trigger.EventMessage.Events; 

public class EventProcessActor extends AbstractLoggingActor { 

    public static class EventActivity { 
     final EventMessage startOfEventMessage; 

     public EventMessage getStartOfEventMessage() { 
      return startOfEventMessage; 
     } 

     public EventActivity(Events events, Path eventPath) { 
      startOfEventMessage = new EventMessage(events, eventPath); 
     } 

    } 

    public static class EventReadingActivity { 

     final EventMessage startOfReadingMessage; 

     public EventMessage getStartOfReadingMessage() { 
      return startOfReadingMessage; 
     } 

     public EventReadingActivity(Events events, Path eventPath) { 
      startOfReadingMessage = new EventMessage(events, eventPath); 

     } 

    } 

    public static class EndOfEventActivity { 

     final EventMessage endOfEventMessage; 

     public EventMessage getEndOfEventMessage() { 
      return endOfEventMessage; 
     } 

     public EndOfEventActivity(Events events, Path eventPath) { 
      endOfEventMessage = new EventMessage(Events.END_OR, eventPath); 

     } 
    } 

    private final PartialFunction<Object, BoxedUnit> receiveEvent; 

    private final PartialFunction<Object, BoxedUnit> startEventsProcessing; 

    private final PartialFunction<Object, BoxedUnit> completeEventProcessing; 

    public EventProcessActor() { 

     receiveEvent = ReceiveBuilder 
       .match(EventActivity.class, this::onStartEventReceive) 
       .match(EventReadingActivity.class, this::readEventLine).build(); 

     startEventsProcessing = ReceiveBuilder 
       .match(EventReadingActivity.class, this::readEventLine) 
       .match(EndOfEventActivity.class, this::onEndOfEventProcessing) 
       .build(); 

     completeEventProcessing = ReceiveBuilder.match(
       EndOfEventActivity.class, this::onEndOfEventProcessing).build(); 

     receive(receiveEvent); 
    } 

    public static Props props() { 

     return Props.create(EventProcessActor.class); 
    } 

    public void onStartEventReceive(EventActivity fileActivity) { 
     log().info("We received the Events need to process it"); 
     getContext().become(startEventsProcessing); 
    } 

    public void readEventLine(EventReadingActivity fileActivity) { 
     log().info("We are processing Events"); 
     getContext().become(completeEventProcessing); 

    } 

    public void onEndOfEventProcessing(EndOfEventActivity fileActivity) { 
     log().info("Completed Events processing"); 

    } 

    public static void main(String args[]) throws IOException { 

     ActorSystem syste = ActorSystemImpl.create(); 
     final ActorRef procsssEvents = syste.actorOf(EventProcessActor.props(), 
       "Event" + "Processing"); 

     procsssEvents.tell(new EventProcessActor.EventActivity(Events.STSRT, 
       Paths.get("/")), procsssEvents); 
     procsssEvents.tell(new EventProcessActor.EventActivity(
       Events.READING_LINE, Paths.get("/")), procsssEvents); 
     procsssEvents.tell(new EventProcessActor.EventActivity(Events.END_OR, 
       Paths.get("/")), procsssEvents); 

     System.out.println("Enter to terminate"); 
     System.in.read(); 

    } 

} 

事件消息

package com.ebc.biz.akka.event.trigger; 

import java.nio.file.Path; 

public class EventMessage { 

    public static enum Events { 

     STSRT, READING_LINE, END_OR; 

    } 

    private final Events readEvents; 
    private final Path pathOfEvents; 

    public Path getPathOfEvents() { 
     return pathOfEvents; 
    } 

    public Events getReadEvents() { 
     return readEvents; 
    } 

    public EventMessage(Events readEvents, Path pathOfFile) { 
     this.readEvents = readEvents; 
     this.pathOfEvents = pathOfFile; 
    } 

} 

的pom.xml

<groupId>com.ebc.biz</groupId> 
    <artifactId>akka.event.trigger</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 
    <properties> 
     <akka.version>2.4.9</akka.version> 
     <maven-dependency-plugin.version>3.0.0</maven-dependency-plugin.version> 
     <maven.compiler.plugin>3.6.1</maven.compiler.plugin> 
     <java.compiler.target>1.8</java.compiler.target> 
     <java.compiler.source>1.8</java.compiler.source> 
    </properties> 
    <dependencies> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-actor_2.11</artifactId> 
      <version>${akka.version}</version> 
     </dependency> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-http-core_2.11</artifactId> 
      <version>${akka.version}</version> 
     </dependency> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-http-experimental_2.11</artifactId> 
      <version>${akka.version}</version> 
     </dependency> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-http-jackson-experimental_2.11</artifactId> 
      <version>${akka.version}</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.maven.plugins</groupId> 
      <artifactId>maven-dependency-plugin</artifactId> 
      <version>${maven-dependency-plugin.version}</version> 
     </dependency> 
    </dependencies> 

    <build> 
     <plugins> 
      <plugin> 
       <!-- This will download source so easy to see API and java doc. --> 
       <artifactId>maven-source-plugin</artifactId> 
       <executions> 
        <execution> 
         <id>attach-sources</id> 
         <phase>verify</phase> 
         <goals> 
          <goal>jar</goal> 
         </goals> 
        </execution> 
       </executions> 
      </plugin> 
      <!-- Java 8 compiler plugin --> 
      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-compiler-plugin</artifactId> 
       <version>${maven.compiler.plugin}</version> 
       <configuration> 
        <source>${java.compiler.source}</source> 
        <target>${java.compiler.target}</target> 
       </configuration> 
      </plugin> 

     </plugins> 
    </build> 
</project> 

为什么我的消息是不是G分派一个到另一个。我想我错过了一些东西。

在此先感谢您提供任何信息和帮助。

回答

1

编辑:问题是 - 你发送EventActivity而男主角希望每个设计另一种类型,所以你应该更新main

procsssEvents.tell(new EventProcessActor.EventActivity(Events.STSRT, Paths.get("/")), procsssEvents); 
    procsssEvents.tell(new EventProcessActor.EventReadingActivity(Events.READING_LINE, Paths.get("/")), procsssEvents); 
    procsssEvents.tell(new EventProcessActor.EndOfEventActivity(Events.END_OR, Paths.get("/")), procsssEvents); 
+0

如果我不是要求接收(receiveEvent);在EventProcessActor构造函数中,我得到了异常,并且比从主方法发送的所有消息都要“遇到死信”。 并且只有第一条消息才会被解雇,因为我称之为“receive(receiveEvent);”那是第一件事。 我试图在调用“告诉调用”的每一行之前放置线程睡眠stil同样的问题只有第一个事件正在被记录或触发。 我曾尝试将Thread.sleep(2000)也放在每个调用之前。 –

+1

对,这个问题在别的地方,我已经更新了答案。 – MirMasej