2017-07-01 30 views
0

后也根据从WSO2网站的article当你从此以下两个流JOIN事件发生:西提JOIN事件窗口已过期

当从一个流的事件到达插播加入处理器,它与其他流窗口处理器的所有可用事件相匹配。当找到匹配项时,那些匹配的事件将被发送到查询投影机以创建输出事件;同时,原始事件将被添加到窗口处理器,并且它将保持在那里直到它到期。同样,当一个事件从其窗口处理器到期时,它会与另一个流处理器的所有可用事件进行匹配;当找到匹配项时,那些匹配的事件将被发送到查询投影机以创建输出过期事件。

基本上,一个事件将与其他流的窗口的事件,当它到达再次将寻找新比赛时,它的从窗口过期匹配。但这不是我在我设置的测试环境中注意到的行为。这里是我的查询:

FROM first_names#window.time(1 min) AS fst 
UNIDIRECTIONAL JOIN last_names#window.time(2 min) AS lst 
ON fst.identifier == lst.identifier 
SELECT 
    fst.identifier, 
    fst.firstName, 
    lst.lastName 
INSERT INTO full_names 

然后我发布的顺序以下事件以下列出的各自的流:

{ 
    "lastName": "Colbert", 
    "identifier": 1 
} 

{ 
    "firstName": "Stephen", 
    "identifier": 1 
} 

{ 
    "lastName": "Carell", 
    "identifier": 1 
} 

第二事件到达,其对应已经存在其他流的窗口,所以他们匹配和加入事件正如预期地立即发射:

{ 
    "firstName": "Stephen", 
    "lastName": "Colbert", 
    "identifier": 1 
} 

然后新近到达的事件被存放于该流窗口1分钟

{ 
    "firstName": "Stephen", 
    "identifier": 1 
} 

1分钟高达并且该事件被终止,一个新对应因为它存在于其他流的窗口中:

{ 
    "lastName": "Carell", 
    "identifier": 1 
} 

所以基于这样的文章应该与它新的匹配加盟事件发送以及看起来像:

{ 
    "firstName": "Stephen", 
    "lastName": "Carell", 
    "identifier": 1 
} 

,这一事件永远不会到达,流量不表现为在那篇文章中解释!

任何想法可能会导致这种情况,或者如果该文章的声明是准确的并代表WSO2 Siddhi行为?因为我没有在官方文档或其他文章中看到这个,所以我对此有点怀疑。在此先感谢,我非常感谢您的帮助。

回答

1

上述文章中解释的行为是准确的。这个问题似乎与您的查询。在您的测试查询中,您使用的是INSERT INTO full_names,因此只有当前事件才会发送到该流。但是,加入过期事件的事件将被发送至流,因为过期事件。因此,如果您想要获得那些过期事件,则必须特别使用其输出事件类别(即INSERT EXPIRED EVENTS INTO full_namesINSERT ALL EVENTS INTO full_names)来提及它。请参阅以下documentation以了解有关输出事件类别的更多信息。

+0

非常感谢。但是这带来了一些折衷和跟进担忧。使用'ALL'会导致重复输出。 'msg1'从'S1'到达,如果在'S2'的窗口中有'msg2'的匹配,它会匹配并发出** joined **事件。然后当它到期时,如果'msg2'仍然在'S2'的窗口中,它将再次匹配**并再次发出** joined **事件。如果**不**,它会发出** msg1'的非匹配**版本,即使它在到达时已经找到匹配。用'UNIDIRECTINOAL'和'窗口大小'调整,可以减少这种重复。 – samser

+0

但不是我们想要的行为!我需要来自'S1'的每一个事件;如果在S2中匹配它,** joined **事件就足够了,如果没有匹配,就直接发送事件。但通过'UNIDIRECTIONAL LEFT OUTER JOIN',来自'S1'的**单**事件总是会导致**两个**输出事件。一到达后,一到期后。它可以是“不匹配”和“匹配”,“匹配和不匹配”,“不匹配和不匹配”以及“匹配和匹配”!我想用'EXPIRED EVENTS'这个行为是可以实现的,但它损害了我们流处理的实时性。 – samser

+0

我能想到的唯一的事情就是在这个级别接受**重复**并尝试在**下游**中重复删除!有什么想法吗? – samser