2016-02-18 71 views
0

我是Apache Camel的新手。在hp不间断中,有一个接收器接收由事件管理器生成的事件,假设为流。我的目标是设置一个消费者终端,它接收传入消息并通过Camel进行处理。Apache Camel创建消费者组件

另一个端点我只需要将它写入日志。从我的研究,我明白,对于消费者终点,我需要创建自己的组件和配置会像

from("myComp:receive").to("log:net.javaforge.blog.camel?level=INFO") 

这里是从事件系统接收消息,我的代码段。

Receive receive = com.tandem.ext.guardian.Receive.getInstance(); 
    byte[] maxMsg = new byte[500]; // holds largest possible request 
    short errorReturn = 0; 
    do { // read messages from $receive until last close 
     try { 
      countRead = receive.read(maxMsg, maxMsg.length); 
      String receivedMessage=new String(maxMsg, "UTF-8"); 
      //Here I need to handover receivedMessage to camel 

     } catch (ReceiveNoOpeners ex) { 
      moreOpeners = false; 
     } catch(Exception e) { 
      moreOpeners = false; 
     } 
    } while (moreOpeners); 

有人可以引导一些提示如何使这作为消费者。

回答

1

10'000英尺视图是这样的:

您需要从实施组件开始。最简单的入门方法是延长org.apache.camel.impl.DefaultComponent。你唯一要做的就是覆盖DefaultComponent::createEndpoint(..)。很明显,它所做的是创建您的端点。

所以接下来你需要的是实现你的端点。为此延伸org.apache.camel.impl.DefaultEndpoint。覆盖最低DefaultEndpoint::createConsumer(Processor)以创建自己的消费者。

最后但并非最不重要的是你需要实现消费者。同样,最好的办法是延长org.apache.camel.impl.DefaultConsumer。消费者是您的代码必须去生成您的消息的地方。通过构造函数,您将收到对端点的引用。使用端点引用创建一个新的Exchange,填充它并沿路径发送它。沿

Exchange ex = endpoint.createExchange(ExchangePattern.InOnly); 
setMyMessageHeaders(ex.getIn(), myMessagemetaData); 
setMyMessageBody(ex.getIn(), myMessage); 

getAsyncProcessor().process(ex, new AsyncCallback() { 
    @Override 
    public void done(boolean doneSync) { 
     LOG.debug("Mssage was processed " + (doneSync ? "synchronously" : "asynchronously")); 
    } 
}); 

线的东西,我建议你选择一个简单的组件(DirectComponent?)作为一个榜样。

+0

感谢您的回答,我创建了'MessageComponent','MessageEndpoint','MessageProducer'和'MessageConsumer'。 'MessageConsumer'扩展了'DefaultConsumer'。 找不到处理我的消息的方法。我是否需要将其添加到构造函数本身中? – vels4j

+0

重写'DefaultConsumer'的doStart()'和doStop()'方法来启动/停止外部消息订阅/轮询。在我的情况下,我在消费者中实现了一个回调方法,每当我收到外部消息时都会调用它。在这个例子中,我创建了header和body,并将其设置为一个新的'Exchange',如上所示,并且消息在路由下发送。 – Ralf

+0

完成,工作正常,谢谢 – vels4j

0

因此添加我自己的消费者组件可能有助于某人。

public class MessageConsumer extends DefaultConsumer { 

private final MessageEndpoint endpoint; 

private boolean moreOpeners = true; 

public MessageConsumer(MessageEndpoint endpoint, Processor processor) { 
    super(endpoint, processor); 
    this.endpoint = endpoint; 

} 


@Override 
protected void doStart() throws Exception { 

    int countRead=0; // number of bytes read 

    do { 
     countRead++; 
      String msg = String.valueOf(countRead)+" "+System.currentTimeMillis(); 
      Exchange ex = endpoint.createExchange(ExchangePattern.InOnly); 
      ex.getIn().setBody(msg); 
      getAsyncProcessor().process(ex, new AsyncCallback() { 
       @Override 
       public void done(boolean doneSync) { 
        log.info("Mssage was processed " + (doneSync ? "synchronously" : "asynchronously")); 
       } 
      }); 
      // This is an echo server so echo request back to requester  

    } while (moreOpeners); 
} 

@Override 
protected void doStop() throws Exception { 
    moreOpeners = false; 
    log.debug("Message processor is shutdown"); 
} 

}