我从虚拟机发送数据流,以卡夫卡的测试题目用下面从VM插座卡夫卡发送的数据流中,并接收对主机操作系统的弗林克计划:反序列化问题
public class WriteToKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Use ingestion time => TimeCharacteristic == EventTime + IngestionTimeExtractor
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<JoinedStreamEvent> joinedStreamEventDataStream = env
.addSource(new JoinedStreamGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.0.12:9092");
properties.setProperty("zookeeper.connect", "192.168.0.12:2181");
properties.setProperty("group.id", "test");
DataStreamSource<JoinedStreamEvent> stream = env.addSource(new JoinedStreamGenerator());
stream.addSink(new FlinkKafkaProducer09<JoinedStreamEvent>("test", new TypeInformationSerializationSchema<>(stream.getType(),env.getConfig()), properties));
env.execute();
}
代码(上在192.168.0.12的IP主机操作系统上运行)
JoinedStreamEvent
是DataSream<Tuple3<Integer,Integer,Integer>>
型的,它基本上加入2流respirationRateStream
和heartRateStream
public JoinedStreamEvent(Integer patient_id, Integer heartRate, Integer respirationRate) {
Patient_id = patient_id;
HeartRate = heartRate;
RespirationRate = respirationRate;
有是在主机操作系统上运行的尝试读取从卡夫卡的数据流中的另一个弗林克程序。我在这里使用localhost,因为kafka和zookeper都在主机操作系统上运行。
public class ReadFromKafka {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> message = env.addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(), properties));
/* DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
new , properties));*/
message.print();
env.execute();
} //main
} //ReadFromKafka
我得到的输出是这样的
我想我需要实现JoinedStreamEvent
类型的解串器。有人能给我一个想法,我应该怎么写,解码器JoinedStreamEvent
类型DataSream<Tuple3<Integer, Integer, Integer>>
请让我知道如果还有其他事情需要做。
P.S. - 我认为写作以下解串器,但我不认为这是正确的
DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
new TypeInformationSerializationSchema<JoinedStreamEvent>() , properties));
'TypeInformationSerializationSchema'也应该做的工作,但你需要将'TypeInformation'传递给构造函数:' TypeInformation.of(JoinedStreamEvent.class)' – twalthr