2017-05-17 39 views
2

NiFi新功能!使用NiFi流文件作为事件通知器

我想知道是否有办法发送一个空流文件的属性在NiFi的流文件?我想用这个作为触发器来表明一种事件类型已经开始。

在NiFi中是否有任何其他方式可以指示一组事件已经启动并完成?例如,如果我有三个处理器读入数据,我想知道第一个处理器即将被触发并且最后一个处理器已完成。无论如何对我来说这样做?如果处理器继续运行,我希望能够将从处理器1读取的数据一次分组到处理器3。为了使这更清楚

Begin 
Processor1 
Processor2 
Processor3 
End 
Begin 
Processor1 
Processor2 
Processor3 
End 
... 

任何帮助,将不胜感激, 提前感谢!

回答

8

我打算把这个答案分成几个部分,因为这里有很多事情要做。

我想知道是否有办法在NiFi的流文件中发送带有 属性的空流文件?我想用这个作为触发器 来指示一种事件类型已经开始。

GenerateFlowFile处理器允许您以常规运行计划或使用CRON计划发送空的(或填充的)流文件。您可以将其与UpdateAttribute处理器结合使用,为流文件添加任意静态或动态属性。

在NiFi中是否有任何其他方式可以指示一组事件 已启动并完成?例如,如果我有三个处理器 读入数据,我想知道第一个处理器是 即将被触发,并且最后一个处理器已完成。无论如何,我有 吗?

这接近批处理,Apache NiFi并没有设计或优化。确定一个处理器是“即将被触发”是非常困难的。如果该处理器是以定时器/ CRON为基础触发的,则可以知道该时间,但如果您的意思是“GetFile即将成功检索文件”,那么这并不容易。可以使用自己的客户处理器扩展处理器,并覆盖onTrigger()方法,以便在另一个处理器可以接受的DistributedMapCacheClientService中存储某些值。或者我想你可以将逻辑包装在ExecuteScript处理器中并编写自定义通知代码。我不确定的目标在这里 - 谁得知这个状态变化通知?它是另一个处理器,人类​​观察者还是外部服务?

如果处理器继续运行,我希望能够将从处理器1读取到处理器3的数据分组为 。为了使 这更清楚

Begin Processor1 Processor2 Processor3 End Begin Processor1 Processor2 Processor3 End ...

不过,我相信你所要求的是可能与使用新WaitNotify处理器。 Koji Kawamura写了一篇很好的文章描述他们的使用here

我认为在这种情况下,你需要特殊的内容或属性能够检测通过该系统传来的批次,除非它是数据的一个单元在同一时间。我会尽量在下面描述两种情况,但我没有太多的上下文。

方案1(数据的单一单位)

随意替换为不同的源的处理器,但我使用GetFile为了简单起见。

假设你有一个目录的完整文本文件(通过一些外部进程放在那里)。每个文件都以“名字姓氏”的形式包含文本,并且其名称为Lastname_YYYY-MM-DD-HH-mm-ss.txt,其中写入的时间戳填充文件名。

GetFile -> ReplaceText -> PutFile 

GetFile处理器会将每个文件作为单独的流文件引入。从那里,ReplaceText可以做一些简单的事情,比如使用正则表达式切换名称的顺序,PutFile将内容写回到文件系统。当GetFile被触发第一次,它将派遣ň flowfiles到连接/队列ReplaceText。如果你想让它等待和并行执行的操作,而不是线性的,你可以在成功队列中的回压设定为1 flowfile防止以上的处理器(GetFile)运行,直到队列为空一次。

方案2(多flowfiles必须被组合在一起,并结合手术)

这里,你会想用MergeContent收集多个flowfiles到一个单一的一个。您可以在bin门槛设置为ñ flowfiles,当它达到传入flowfiles的最小数量的MergeContent处理器将只传输一个成功 flowfile。您也可以按属性进行分类,因此,如果您从异构输入源读取数据,则仍然可以根据共同特征关联相关联的数据片段。

Wait & Notify

可选场景,此外,还可以使用Notify处理器触发flowfile发送到相应的Wait处理器“释放”的“内容” flowfiles他们所需的目的地。同样,Koji的文章上面链接了一个示例流程和一些截图详细解释了这一点。

我希望这至少给你一个方向可循。没有更多的上下文,我仍然可以感受到你正试图在这里解决非NiFi问题,或者也许可以调整你的数据流模型来更好地支持流式思路。如果您有更多信息,我很乐意扩大答案。

+1

非常感谢您抽出时间给出详细的回复 - 无法告诉您我多么感激!你的答案非常清晰,易于理解。我想我正在寻找的是等待和通知 - 我肯定会更多地探索这个选项,谢谢你指出这一点! – BigBug