2017-08-08 50 views
2

当我执行我的弗林克应用它给了我这个NullPointerException阿帕奇弗林克:NullPointerException异常引起TupleSerializer

2017-08-08 13:21:57,690 INFO com.datastax.driver.core.Cluster - New Cassandra host /127.0.0.1:9042 added 
2017-08-08 13:22:02,427 INFO org.apache.flink.runtime.taskmanager.Task      - TriggerWindow(TumblingEventTimeWindows(30000), ListStateDescriptor{seria[email protected]15d1c80b}, EventTimeTrigger(), WindowedStream.apply(CoGroupedStreams.java:302)) -> Filter -> Flat Map -> Sink: Cassandra Sink (1/1) (092a7ef50209f7a050d9d82be1e03d80) switched from RUNNING to FAILED. 
java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) 
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) 
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) 
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) 
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530) 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) 
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) 
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) 
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) 
    at org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:400) 
    at org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:682) 
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:43) 
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:31) 
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:597) 
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:504) 
    at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) 
    at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) 
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) 
    ... 7 more 
Caused by: java.lang.NullPointerException 
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104) 
    at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30) 
    at   org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526) 
    ... 22 more 

回答

2

弗林克的元组串行器不支持空值。你应该检查一个元组是否包含一个空字段。

元组的替代方法是POJO或类型。 支持任意多个可为空的字段,但需要明确的类型说明。

+0

你是对的,谢谢。 – Zizou

相关问题