2012-07-10 68 views
1

我有一个应用程序,使用akka,现在我想通过套接字连接来连接它。因此,我使用类似于the one from the scala page的机制。 但是,如果我尝试tell,而我打开OutputStream,目标没有收到任何消息。套接字连接和ActorSystem

这里是我的源代码:

object Connector { 

    def main(args: Array[String]) { 
    val port = 1337 
    val conf = ConfigFactory.load 
    val system = ActorSystem("SDDB", conf.getConfig("SDDB")) 
    val master = system.actorOf(Props[TestActor]) 
    master ! "a" 

    try { 
     val listener = new ServerSocket(port) 
     println("listening on port: " + port) 
     while (true) 
     new ConnectionThread(listener accept, master).start 
     listener close 
    } catch { 
     case e: IOException => 
     System.err.println("Could not listen on port: " + port + ".") 
     System.exit(-1) 
    } finally { 
     system.shutdown 
    } 
    } 
} 

case class ConnectionThread(socket: Socket, master: ActorRef) 
    extends Thread("ConnectionThread") { 

    private val Select_* = """select (\w+) from (\w+) on (\d{4})-(\d\d)-(\d\d)""".r 
    private implicit var id = 0L 
    private implicit val timeout = Timeout(25.0 seconds) 

    master ! "b" 

    override def run { 
    master ! "c" 
    try{ 
     master ! "d" 
     val in = new ObjectInputStream(socket getInputStream) 
     master ! "e" 
     val out = new ObjectOutputStream(socket getOutputStream) 

     out writeObject("listening") 
     out flush 

     master ! "f" 
     val command = in.readObject.asInstanceOf[String] 
     println("client sent: '" + command + "'") 
     // process the command 

     master ! "g" 
     out.writeObject("EOF") 
     out.flush 

     out.close 
     in.close 
     socket.close 
    } catch { 
     case e: SocketException => 
     case e: IOException => e printStackTrace 
    } 
    } 
} 

class TestActor extends Actor with ActorLogging{ 

    log info("TestActor running") 

    def receive = { 
    case s: String => 
     log info("received: " + s) 
    } 

} 

我得到的输出:

listening on port: 1337 
[INFO] TestActor running 
[INFO] received: a 
[INFO] received: b 
[INFO] received: c 
[INFO] received: d 

现在,我希望它继续下去,直到克,而是我得到:

client sent: 'select content from testdata on 2012-07-06' 

我想通了,直到我打开一个插座流,可能是因为tellask也是基于套接字的,并且使用套接字的输出流,踏入运行。之后,套接字连接起作用,但是我不能将任何消息发送给参与者系统。
有没有办法让我放弃连接器和ConnectionThread。我该如何解决它?

+0

http://doc.akka.io/docs/akka/2.0.2/scala/io.html – 2012-07-10 12:24:27

+0

感谢您的快速回答。不幸的是,我无法从文档中获得我代码中的更改。你能给我一个提示如何改变它吗? – 2012-07-11 07:23:27

+0

我可以回答特定问题,但我没有时间为您编写解决方案。 – 2012-07-11 10:39:12

回答

0

我必须承认,我并没有完全理解文档中的例子。但我发现使用ConnectionHelper而不是直接处理ActorRef的工作非常好。
我改变了我的代码如下:

object Connector { 

    def main(args: Array[String]) { 
    val port = 1337 
    val conf = ConfigFactory.load 
    val system = ActorSystem("SDDB", conf.getConfig("SDDB")) 

    // val master = system.actorOf(Props[TestActor], "master") 
    // master ! "a" 

    try { 
     val listener = new ServerSocket(port) 
     println("listening on port: " + port) 
     while (true) 
     //  new ConnectionThread(listener accept, master.asInstanceOf[TestActor]).start 
     new ConnectionThread(listener accept, system).start 
     listener close 
    } catch { 
     case e: IOException => 
     System.err.println("Could not listen on port: " + port + ".") 
     System.exit(-1) 
    } finally { 
     //  master ! PoisonPill 
     system.shutdown 
    } 
    } 

} 

case class ConnectionThread(socket: Socket, sys: ActorSystem) 
    extends Thread("ConnectionThread") { 

    private val Select_* = """select (\w+) from (\w+) on (\d{4})-(\d\d)-(\d\d)""".r 
    private implicit var id = 0L 
    private implicit val timeout = Timeout(25.0 seconds) 
    private val conHelper = new ConnectionHelper 

    override def run { 
    try { 
     val out = new ObjectOutputStream(socket getOutputStream) 
     val in = new ObjectInputStream(socket getInputStream) 

     conHelper tell "funzt" 
     out writeObject ("Hi") 
     out.flush 
     val command = in.readObject.asInstanceOf[String] 
     println("received: " + command) 
     out writeObject ("test") 
     out.flush 
     out writeObject ("EOF") 
     out.flush 

     out.close 
     in.close 
     socket.close 
    } 
    } 

    private class ConnectionHelper { 
    val tester = sys.actorOf(Props[TestActor]) 

    def tell(s: String) { tester ! s } 

    } 

} 

我真的不明白为什么这个工程,并从我的问题的代码不会。我欢迎所有的解释。