2016-08-10 26 views
9

我正在研究数据流应用程序,并且正在研究使用Apache Flink进行此项目的可能性。其主要原因是它支持非常类似于Java 8的Stream API的高级流构造。如何在Apache Flink的数据库中查找并更新记录的状态?

我将接收与数据库中特定记录对应的事件,并希望能够处理这些事件(来自消息代理,如RabbitMQ或Kafka),并最终更新数据库中的记录,并且将已处理/已转换的事件推送到另一个接收器(可能是另一个消息代理)。

理想情况下,需要按照FIFO顺序处理与特定记录相关的事件(虽然会有时间戳帮助检测乱序事件),但可以并行处理与不同记录相关的事件。我打算使用keyBy()构造来按记录对流进行分区。

需要完成的处理取决于数据库中关于记录的当前信息。但是,我无法找到一个示例或推荐的方法来查询数据库中的这些记录,以丰富正在处理它的事件并用我需要处理的附加信息进行处理。

管道我想到如下:

- > keyBy()上的ID接收 - >从数据库中检索对应于该ID的记录 - >上的记录执行处理步骤 - >推处理的事件转移到外部队列并更新数据库记录

数据库记录将需要更新,因为另一个应用程序将查询数据。

在实现此管道后,可能会进行其他优化。例如,可以将(更新的)记录缓存在受管理状态中,以便同一记录上的下一个事件不需要另一个数据库查询。但是,如果应用程序不知道特定记录,则需要从数据库中检索它。

在Apache Flink中使用这种场景的最佳方法是什么?

回答

4

您可以通过扩展丰富的函数来执行数据库查找。一个RichFlatMap功能,在其open()方法初始化一次数据库连接,然后处理在flatMap()方法每个事件:

public static class DatabaseMapper extends RichFlatMapFunction<Event, EncrichedEvent> { 

    // Declare DB coonection and query statements 

    @Override 
    public void open(Configuration parameters) throws Exception { 
     // Initialize Database connection 
     // Prepare Query statements 
    } 

    @Override 
    public void flatMap(Event currentEvent, Collector<EncrichedEvent> out) throws Exception { 
     // look up the Database, update record, enrich event 
     out.collect(enrichedEvent);   
    } 
}) 

然后你就可以使用DatabaseMapper如下:

stream.keyby(id) 
     .flatmap(new DatabaseMapper()) 
     .addSink(..); 

你可以找到here一个使用Redis缓存数据的例子。

+1

谢谢。那么当Flink以分布式集群模式运行时会发生什么。它是否从集群的每个节点建立连接? – jbx

+0

根据您设置的并行性,您可以有许多'flatmap'实例。每个节点上可能有多个运营商实例(取决于配置的任务槽数量)。 对于每个并行实例,只调用一次'open()'方法,并为每个传入事件调用'flatmap()'。 –

+1

好的,谢谢。我会去的。您有什么机会知道是否可以将Spring集成到Flink中(反之亦然)?有像依赖注入,实体管理器自动布线等。这对使用非常有用,但我似乎无法在网上找到任何有关这样做的人的信息(以及是否有任何陷阱)。 – jbx

相关问题