我打算把这个答案分成几个部分,因为这里有很多事情要做。
我想知道是否有办法在NiFi的流文件中发送带有 属性的空流文件?我想用这个作为触发器 来指示一种事件类型已经开始。
GenerateFlowFile
处理器允许您以常规运行计划或使用CRON计划发送空的(或填充的)流文件。您可以将其与UpdateAttribute
处理器结合使用,为流文件添加任意静态或动态属性。
在NiFi中是否有任何其他方式可以指示一组事件 已启动并完成?例如,如果我有三个处理器 读入数据,我想知道第一个处理器是 即将被触发,并且最后一个处理器已完成。无论如何,我有 吗?
这接近批处理,Apache NiFi并没有设计或优化。确定一个源处理器是“即将被触发”是非常困难的。如果该处理器是以定时器/ CRON为基础触发的,则可以知道该时间,但如果您的意思是“GetFile
即将成功检索文件”,那么这并不容易。可以使用自己的客户处理器扩展处理器,并覆盖onTrigger()
方法,以便在另一个处理器可以接受的DistributedMapCacheClientService
中存储某些值。或者我想你可以将逻辑包装在ExecuteScript
处理器中并编写自定义通知代码。我不确定的目标在这里 - 谁得知这个状态变化通知?它是另一个处理器,人类观察者还是外部服务?
如果处理器继续运行,我希望能够将从处理器1读取到处理器3的数据分组为 。为了使 这更清楚
Begin Processor1 Processor2 Processor3 End Begin Processor1 Processor2 Processor3 End ...
不过,我相信你所要求的是可能与使用新Wait
和Notify
处理器。 Koji Kawamura写了一篇很好的文章描述他们的使用here。
我认为在这种情况下,你需要特殊的内容或属性能够检测通过该系统传来的批次,除非它是数据的一个单元在同一时间。我会尽量在下面描述两种情况,但我没有太多的上下文。
方案1(数据的单一单位)
随意替换为不同的源的处理器,但我使用GetFile
为了简单起见。
假设你有一个目录的完整文本文件(通过一些外部进程放在那里)。每个文件都以“名字姓氏”的形式包含文本,并且其名称为Lastname_YYYY-MM-DD-HH-mm-ss.txt
,其中写入的时间戳填充文件名。
GetFile -> ReplaceText -> PutFile
GetFile
处理器会将每个文件作为单独的流文件引入。从那里,ReplaceText
可以做一些简单的事情,比如使用正则表达式切换名称的顺序,PutFile
将内容写回到文件系统。当GetFile
被触发第一次,它将派遣ň flowfiles到连接/队列ReplaceText
。如果你想让它等待和并行执行的操作,而不是线性的,你可以在成功队列中的回压设定为1
flowfile防止以上的处理器(GetFile
)运行,直到队列为空一次。
方案2(多flowfiles必须被组合在一起,并结合手术)
这里,你会想用MergeContent
收集多个flowfiles到一个单一的一个。您可以在bin门槛设置为ñ flowfiles,当它达到传入flowfiles的最小数量的MergeContent
处理器将只传输一个成功 flowfile。您也可以按属性进行分类,因此,如果您从异构输入源读取数据,则仍然可以根据共同特征关联相关联的数据片段。
与Wait
& Notify
可选场景,此外,还可以使用Notify
处理器触发flowfile发送到相应的Wait
处理器“释放”的“内容” flowfiles他们所需的目的地。同样,Koji的文章上面链接了一个示例流程和一些截图详细解释了这一点。
我希望这至少给你一个方向可循。没有更多的上下文,我仍然可以感受到你正试图在这里解决非NiFi问题,或者也许可以调整你的数据流模型来更好地支持流式思路。如果您有更多信息,我很乐意扩大答案。
非常感谢您抽出时间给出详细的回复 - 无法告诉您我多么感激!你的答案非常清晰,易于理解。我想我正在寻找的是等待和通知 - 我肯定会更多地探索这个选项,谢谢你指出这一点! – BigBug