2017-07-25 70 views
1

我想流csv文件并使用flink执行sql操作。但我写的代码只读取一次并停止。它不流。在此先感谢,Flink CsvTableSource Streaming

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

StreamTableEnvironment tableEnv = StreamTableEnvironment.getTableEnvironment(env); 

CsvTableSource csvtable = CsvTableSource.builder() 
    .path("D:/employee.csv") 
    .ignoreFirstLine() 
    .fieldDelimiter(",") 
    .field("id", Types.INT()) 
    .field("name", Types.STRING()) 
    .field("designation", Types.STRING()) 
    .field("age", Types.INT()) 
    .field("location", Types.STRING()) 
    .build(); 

tableEnv.registerTableSource("employee", csvtable); 

Table table = tableEnv.scan("employee").where("name='jay'").select("id,name,location"); 
//Table table1 = tableEnv.scan("employee").where("age > 23").select("id,name,age,location"); 

DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class); 

//DataStream<Row> stream1 = tableEnv.toAppendStream(table1, Row.class); 

stream.print(); 
//stream1.print(); 

env.execute(); 

回答

2

CsvTableSource是基于FileInputFormat其读取并分析由行引用的文件一致。结果行被转发到流式查询中。所以在CsvTableSource是流的意思,行不断读取和转发。但是,CsvTableSource终止于文件的末尾。因此,它发出一个有界的流。

我认为您期望的行为是CsvTableSource直到它结束读取文件,然后等待将写入追加到文件。 但是,这不是CsvTableSource的工作原理。您需要为此自定义TableSource

+0

感谢您的信息@Fabian Hueske –

相关问题