2017-09-16 30 views
0

我想在Windows 10上使用Apache Flink 1.3.2与Java 1.8.0_144 IDE Eclipse Mars实现日志分析器。是不是允许查​​询Apache Flink Table API中的POJO数据集的超类型

语境:

  • 有多种类型的LogMessage的。
  • 为每种类型创建POJO。
  • 为每种类型创建POJO类型的DataSet实例。
  • 然后使用表API进行查询,如下所示。

这工作正常。

DataSet<String> rawLogs = env.readTextFile(input);// input is the data file path 
DataSet<FirstBackupMessage> logMsgPOJODataSet = rawLogs.map(new LogMapFunction()); 
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); 
Table LogMessageTable = tableEnv.fromDataSet(logMsgPOJODataSet); 
Table result = tableEnv .sql("Select taskId from " + LogMessageTable); 
tableEnv.toDataSet(result, Row.class).print(); 

要求: 我尝试使用工厂模式来概括这个实现。 为了做到这一点我尝试将POJO类泛化到LogMessage 接口。在上述情况下:

public class FirstBackupMessage implements LogMessage 
similarly 
public class SecondBackupMessage implements LogMessage 
public class ThirdBackupMessage implements LogMessage 

在MapFunction实现我填充特定的类实例,但map函数的输出映射到即的LogMessage 在上述情况下一般引用这将是

DataSet<LogMessage> logMsgPOJODataSet = rawLogs.map(new LogMapFunction()); 
//the LogMapFunction.map method is populating FirstBackupMessage 

在此之后,如果我尝试查询POJO FirstBackupMessage中存在的字段,但现在参考接口(即LogMessage ),它会抛出异常,表示我查询的字段未找到。

奇怪的是,如果我打印与通用参考DataSet中即logMsgPOJODataSet.print()它打印在这种情况下FirstBackupMessage具体POJO的所有字段。

问题: 是这样的转换为DataSet的泛型引用是不允许/可用Flink Table APIs?

+0

这是OO的核心!在Java变量中有一个类型,并且只能在取消引用此变量时才能访问该_declared type_的成员。另一方面,在运行时,这个变量可以引用任何子类型的实例,并且方法调用(即'toString')由这个_runtime type_确定。 – Seelenvirtuose

+0

感谢您的回应!是的,你是对的,我的不好。这回答了为什么toString打印所有值的第二部分。但是接下来的问题是(或者应该)如何在运行时在flink数据集或Dayastream API中将其转换回子类型。 –

回答

0

Table API/SQL库在关系表上运行。通过调用TableEnvironment.fromDataSet(logMsgPOJODataSet),将DataSetlogMsgPOJODataSet逻辑转换为表格。在此过程中,需要根据logMsgPOJODataSetDataSet的类型来识别新表的模式。 Flink的DataSet API使用TypeInformation来确定DataSet的数据类型。

由于logMsgPOJODataSetDataSet的类型为LogMessage,因此表API不知道其任何子类型。因此,包含LogMessage的所有字段,但不包含子类型字段。

在任何情况下,都不可能在同一个表中处理不同类型的行。所有行必须具有相同的模式。处理这种情况的两种方法是:

  1. 使架构成为所有子类型的超集,并且对于不支持的类型具有空值。也许添加另一个字段,指示子类型。
  2. 添加保存所有子类型数据的通用Map<String, String>字段。

在这两种情况下,都需要使用DataSet API完成转换,例如使用MapFunction

相关问题