2017-08-22 32 views
0

我从虚拟机发送数据流,以卡夫卡的测试题目用下面从VM插座卡夫卡发送的数据流中,并接收对主机操作系统的弗林克计划:反序列化问题

public class WriteToKafka { 

    public static void main(String[] args) throws Exception { 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

     // Use ingestion time => TimeCharacteristic == EventTime + IngestionTimeExtractor 
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 

     DataStream<JoinedStreamEvent> joinedStreamEventDataStream = env 
       .addSource(new JoinedStreamGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()); 

     Properties properties = new Properties(); 

     properties.setProperty("bootstrap.servers", "192.168.0.12:9092"); 
     properties.setProperty("zookeeper.connect", "192.168.0.12:2181"); 
     properties.setProperty("group.id", "test"); 

     DataStreamSource<JoinedStreamEvent> stream = env.addSource(new JoinedStreamGenerator()); 
     stream.addSink(new FlinkKafkaProducer09<JoinedStreamEvent>("test", new TypeInformationSerializationSchema<>(stream.getType(),env.getConfig()), properties)); 

     env.execute(); 
    } 
代码(上在192.168.0.12的IP主机操作系统上运行)

JoinedStreamEventDataSream<Tuple3<Integer,Integer,Integer>>型的,它基本上加入2流respirationRateStreamheartRateStream

public JoinedStreamEvent(Integer patient_id, Integer heartRate, Integer respirationRate) { 
     Patient_id = patient_id; 
     HeartRate = heartRate; 
     RespirationRate = respirationRate; 

有是在主机操作系统上运行的尝试读取从卡夫卡的数据流中的另一个弗林克程序。我在这里使用localhost,因为kafka和zookeper都在主机操作系统上运行。

public class ReadFromKafka { 


    public static void main(String[] args) throws Exception { 
     // create execution environment 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

     Properties properties = new Properties(); 
     properties.setProperty("bootstrap.servers", "localhost:9092"); 
     properties.setProperty("zookeeper.connect", "localhost:2181"); 
     properties.setProperty("group.id", "test"); 



     DataStream<String> message = env.addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(), properties)); 

     /* DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test", 
       new , properties));*/ 

     message.print(); 


     env.execute(); 


    } //main 
} //ReadFromKafka 

我得到的输出是这样的

enter image description here

我想我需要实现JoinedStreamEvent类型的解串器。有人能给我一个想法,我应该怎么写,解码器JoinedStreamEvent类型DataSream<Tuple3<Integer, Integer, Integer>>

请让我知道如果还有其他事情需要做。

P.S. - 我认为写作以下解串器,但我不认为这是正确的

DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test", 
       new TypeInformationSerializationSchema<JoinedStreamEvent>() , properties)); 
+0

'TypeInformationSerializationSchema'也应该做的工作,但你需要将'TypeInformation'传递给构造函数:' TypeInformation.of(JoinedStreamEvent.class)' – twalthr

回答

0

我可以通过编写自定义串行器和解串两个虚拟机和主机OS程序来接收相同的格式事件VM下面

public class JoinSchema implements DeserializationSchema<JoinedStreamEvent> , SerializationSchema<JoinedStreamEvent> { 


    @Override 
    public JoinedStreamEvent deserialize(byte[] bytes) throws IOException { 
     return JoinedStreamEvent.fromstring(new String(bytes)); 
    } 

    @Override 
    public boolean isEndOfStream(JoinedStreamEvent joinedStreamEvent) { 
     return false; 
    } 

    @Override 
    public TypeInformation<JoinedStreamEvent> getProducedType() { 
     return TypeExtractor.getForClass(JoinedStreamEvent.class); 
    } 

    @Override 
    public byte[] serialize(JoinedStreamEvent joinedStreamEvent) { 
     return joinedStreamEvent.toString().getBytes(); 
    } 
} //JoinSchema 

提到请注意,您可能需要写fromstring()方法在你的事件类型的方法,因为我已经加入下面fromString()JoinedStreamEvent类

public static JoinedStreamEvent fromstring(String line){ 

     String[] token = line.split(","); 

     JoinedStreamEvent joinedStreamEvent = new JoinedStreamEvent(); 


     Integer val1 = Integer.valueOf(token[0]); 
     Integer val2 = Integer.valueOf(token[1]); 
     Integer val3 = Integer.valueOf(token[2]); 

     return new JoinedStreamEvent(val1,val2,val3); 


    } //fromstring 

个活动使用下面的代码从VM发送使用下面的代码收到

stream.addSink(new FlinkKafkaProducer09<JoinedStreamEvent>("test", new JoinSchema(), properties)); 

活动

public static void main(String[] args) throws Exception { 
    // create execution environment 
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

    Properties properties = new Properties(); 
    properties.setProperty("bootstrap.servers", "localhost:9092"); 
    properties.setProperty("zookeeper.connect", "localhost:2181"); 
    properties.setProperty("group.id", "test"); 


    DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test", 
      new JoinSchema(), properties)); 

    message.print(); 



    env.execute(); 


} //main 
相关问题