2016-07-06 58 views
0

数据写到一个简单的例子弗林克读取卡夫卡

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val properties = new Properties() 
properties.setProperty("bootstrap.servers","xxxxxx") 
properties.setProperty("zookeeper.connect","xxxxxx") 
properties.setProperty("group.id", "caffrey") 
val stream = env 
    .addSource(new FlinkKafkaConsumer082[String]("topic", new SimpleStringSchema(), properties)) 
.print() 

env.execute("Flink Kafka Example") 

当运行此代码我收到一个错误这样的:

[错误]类 org.apache.flink .streaming.api.checkpoint.CheckpointNotifier未找到 - 继续使用存根。

我谷歌这个错误,找到CheckpointNotifier是一个interface。 我真的不明白我在哪里做错了。

回答

2

由于CheckpointNotifier是旧版Flink版本的一个类,我怀疑你在你的pom文件中混合了不同的Flink依赖关系。

确保所有Flink依赖项具有相同的版本。

+0

我更改降级flink版本,它的作品谢谢! – user2341602