2013-02-15 49 views
1

我遇到了Camel的FTP2组件问题,消费者演员居住在Akka系统中。Akka + Camel + FTP2 + localWorkingDirectory无法可靠运行

基本的想法是监视一个FTP目录的文件,然后产生一个小孩演员分别处理每个文件。 Akka正在用于管理并发性和可靠性。父消费者使用noop = true轮询目录,所以它不做任何事情,那么儿童消费者应该下载文件,并用'include'骆驼选项进行过滤。下载是并发的,重要的是文件不要加载到内存中(因此使用localWorkDirectory)。

我写了一个简单的摄制:

package camelrepro; 

import java.io.InputStream; 

import org.mockftpserver.core.command.Command; 
import org.mockftpserver.core.command.ReplyCodes; 
import org.mockftpserver.core.session.Session; 
import org.mockftpserver.core.session.SessionKeys; 
import org.mockftpserver.fake.FakeFtpServer; 
import org.mockftpserver.fake.UserAccount; 
import org.mockftpserver.fake.command.AbstractFakeCommandHandler; 
import org.mockftpserver.fake.filesystem.FileEntry; 
import org.mockftpserver.fake.filesystem.UnixFakeFileSystem; 

import akka.actor.ActorSystem; 
import akka.actor.Props; 
import akka.camel.CamelMessage; 
import akka.camel.javaapi.UntypedConsumerActor; 
import akka.testkit.JavaTestKit; 

public class Main { 

    public static class ParentActor extends UntypedConsumerActor { 

     public ParentActor() { 
      System.out.println("Parent started"); 
     } 
     @Override 
     public String getEndpointUri() { 
      return "ftp://[email protected]:8021?password=password&readLock=changed&initialDelay=0&delay=200&noop=true"; 
     } 

     @Override 
     public void onReceive(Object msg) throws Exception { 
      if (msg instanceof CamelMessage) { 
       getContext().actorOf(new Props(ChildActor.class), "0"); 
      } else { 
       unhandled(msg); 
      } 
     } 
    } 

    public static class ChildActor extends UntypedConsumerActor { 

     public ChildActor() { 
      System.out.println("Child started"); 
     } 

     @Override 
     public String getEndpointUri() { 
      return "ftp://[email protected]:8021?password=password&readLock=changed&initialDelay=0&delay=200&include=test.txt&localWorkDirectory=/tmp"; 
     } 

     @Override 
     public void onReceive(Object msg) throws Exception { 
      if (msg instanceof CamelMessage) { 
       System.out.println("Child got message"); 
       CamelMessage camelMsg = (CamelMessage) msg; 

       InputStream source = camelMsg.getBodyAs(InputStream.class, getCamelContext()); 
       System.out.println(source.getClass().getName()); 
       System.exit(0); 
      } else { 
       unhandled(msg); 
      } 
     } 
    } 

    public static void main(String[] args) { 

     ActorSystem system = ActorSystem.create("default"); 

     FakeFtpServer ftpServer = new FakeFtpServer(); 
     UnixFakeFileSystem ftpFileSystem = new UnixFakeFileSystem(); 
     ftpServer.setFileSystem(ftpFileSystem); 
     ftpServer.addUserAccount(new UserAccount("anonymous", "password", "/")); 
     ftpServer.setServerControlPort(8021); 

     // fix bug in PWD handling (either Apache FTP client or mock server depending on opinion) 
     ftpServer.setCommandHandler("PWD", new AbstractFakeCommandHandler() { 
      @Override 
      protected void handle(Command command, Session session) { 
       String currentDirectory = (String) session.getAttribute(SessionKeys.CURRENT_DIRECTORY); 
       this.replyCodeForFileSystemException = ReplyCodes.READ_FILE_ERROR; 
       verifyFileSystemCondition(notNullOrEmpty(currentDirectory), currentDirectory, "filesystem.currentDirectoryNotSet"); 
       int replyCode = ReplyCodes.PWD_OK; 
       String replyText = String.format("\"%s\" OK", currentDirectory.replaceAll("\"", "\"\"")); 
       session.sendReply(replyCode, replyText); 
      } 
     }); 
     ftpFileSystem.add(new FileEntry("/test.txt", "hello world")); 
     ftpServer.start(); 

     new JavaTestKit(system) {{ 
      getSystem().actorOf(new Props(ParentActor.class)); 
     }}; 
    } 
} 

Maven依赖显示的版本:

<dependencies> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-actor_2.10</artifactId> 
      <version>2.1.0</version> 
     </dependency> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-remote_2.10</artifactId> 
      <version>2.1.0</version> 
     </dependency> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-camel_2.10</artifactId> 
      <version>2.1.0</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>com.typesafe.akka</groupId> 
      <artifactId>akka-testkit_2.10</artifactId> 
      <version>2.1.0</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.camel</groupId> 
      <artifactId>camel-ftp</artifactId> 
      <version>2.10.3</version> 
     </dependency> 
     <dependency> 
      <groupId>org.mockftpserver</groupId> 
      <artifactId>MockFtpServer</artifactId> 
      <version>2.4</version> 
      <scope>test</scope> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.commons</groupId> 
      <artifactId>commons-io</artifactId> 
      <version>1.3.2</version> 
     </dependency> 
     <dependency> 
      <groupId>commons-net</groupId> 
      <artifactId>commons-net</artifactId> 
      <version>3.2</version> 
     </dependency> 
     <dependency> 
      <groupId>org.slf4j</groupId> 
      <artifactId>slf4j-simple</artifactId> 
      <version>1.7.2</version> 
     </dependency> 
    </dependencies> 

我希望看到的BufferedInputStream写到标准输出 - 并检查ByteArrayInputStream的不是。

但是,相反,我看到的文件没有发现异常:

[ERROR] [02/15/2013 10:53:32.951] [default-akka.actor.default-dispatcher-7] [akka://default/user/$a/0] Error during type conversion from type: org.apache.camel.component.file.remote.RemoteFile to the required type: java.io.InputStream with value GenericFile[test.txt] due java.io.FileNotFoundException: /tmp/test.txt (No such file or directory) 
org.apache.camel.TypeConversionException: Error during type conversion from type: org.apache.camel.component.file.remote.RemoteFile to the required type: java.io.InputStream with value GenericFile[test.txt] due java.io.FileNotFoundException: /tmp/test.txt (No such file or directory) 
    at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:162) 

有好几次,它的工作,领导让我怀疑这可能是一个种族的地方。但它几乎总是失败。

任何线索,想法,建议?

FWIW:

uname -a: Linux 3.2.0-37-generiC#58-Ubuntu SMP Thu Jan 24 15:28:10 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux 
java: 1.7.0_11-b21 

回答

2

我找到了解决上述问题的方法。

这是一个事实,儿童消费者autoAck()返回true(它默认情况下)。在这种情况下,akka-camel会发送CamelMessage即时消息,并继续进行清理。与此同时,儿童演员实际上并没有打开InputStream,直到由getBodyAs()调用的其中一个类型转换器将其打开。因此,在通过getBodyAs()打开文件的子actor和异步发送消息后删除文件的Camel清理之间存在竞争。

因此,修复方法是覆盖autoAck()以返回false,并在子消息处理程序的结尾处发送Ack.getInstance()(或new Status.Failure(<cause>),如果您愿意)。

1

使用骆驼2.10.2,因为在2.10.3与FTP成分的问题

+0

好主意,但不幸的是,行为与2.10.2相同。 – 2013-02-15 17:16:26

0

当使用localWorkDirectory =/TMP则该目录是用于存储文件的临时在路由期间。当骆驼交易所完成时,文件被删除。我不确定这是如何与Akka这是异步事件。所以在Camel交换完成后,Akka onReceive可能被称为异步,因此临时文件被删除。而不是:

在骆驼你会路线的文件,以更permament自然

from("ftp:...") 
    .to("file:inbox") 

的filke位置,然后你可以从阿卡(“收件箱文件”)消耗。

+0

在这种情况下转移是否同时发生,并且在例如情况下是可靠的。 ftp服务器偶尔会死亡?我曾经这样设置过,但是我的老板不喜欢直接骆驼路线的方法,想要使用akka进行容错:) – 2013-02-16 11:26:14