0

我是新来的apache骆驼和apache kafka,并为我的项目做了一个小POC。当尝试使用Camel-kafka组件读取kafka时,我遇到以下问题错误日志。错误因为错误而关闭了/127.0.0.1套接字(kafka.network.Processor)

[2016-01-20 08:47:10,979] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) 
[2016-01-20 08:47:44,643] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) 
[2016-01-20 08:47:54,545] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor) 
java.io.IOException: Broken pipe 
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method) 
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) 
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) 
    at sun.nio.ch.IOUtil.write(IOUtil.java:65) 
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470) 
    at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123) 
    at kafka.network.MultiSend.writeTo(Transmission.scala:101) 
    at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231) 
    at kafka.network.Processor.write(SocketServer.scala:472) 
    at kafka.network.Processor.run(SocketServer.scala:342) 
    at java.lang.Thread.run(Thread.java:745) 

我的Java代码如下:

public class Main { 
public static void main(String[] args) throws Exception { 

    CamelContext context = new DefaultCamelContext(); 
    context.addRoutes(new RouteBuilder() { 
     public void configure() { 
      from("kafka:127.0.0.1:9092?topic=TEST&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1") 
        /*.marshal(xmlJsonFormat)*/ 
      .process(new XmlToJson()) 
        /*.to("kafka:localhost:9092?topic=TestJson&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1");*/ 
      .to("file:/Users/himanshu/Desktop/TransCamelFuse/test.txt"); 
     } 
    }); 
    context.start(); 
    Thread.sleep(10000); 
    context.stop(); 
} 

}

我已经把卡夫卡制片人控制台工具,有些TXT和尝试使用的卡夫卡骆驼组成部分阅读。

回答

0

这不是错误。给出这个错误是因为你给的数据类型。您正在逐个手动向Kafka提供数据。

相关问题