2015-07-21 41 views
0

我正在测试一个非常简单的CEP查询与外部定时窗口。查询是define stream LoginEvents (timeStamp long, ip string, phone string); @info(name = 'query1') from LoginEvents#window.externalTime(timeStamp,5 sec) select timeStamp, ip insert all events into uniqueIps;;为什么所有事件都是散装的而不是一个一个的?

看看单元测试here,我想会发生什么是回调将被调用9次,5次传入事件和4次过期。相反,它只被调用一次。为什么会这样?我怎样才能达到每个事件都会调用回调的状态?

回答

1

这是我的实现。我不得不复制send方法,因为它是私密的。您可以替换我的时间戳getter实现,因为它对我的用例非常具体。

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.wso2.siddhi.core.event.ComplexEvent; 
import org.wso2.siddhi.core.event.ComplexEventChunk; 
import org.wso2.siddhi.core.event.Event; 
import org.wso2.siddhi.core.event.stream.StreamEvent; 
import org.wso2.siddhi.core.query.output.callback.QueryCallback; 
import java.util.Arrays; 

public abstract class CustomQueryCallback extends QueryCallback { 

    private static final Logger log = LoggerFactory.getLogger(CustomQueryCallback.class); 

    public void receiveStreamEvent(ComplexEventChunk complexEventChunk) { 
     while (complexEventChunk.hasNext()) { 
      ComplexEvent streamEvent = complexEventChunk.next(); 
      Event event = new Event(streamEvent.getOutputData().length).copyFrom(streamEvent); 
      Event[] events = new Event[]{event}; 
      long timestamp = (streamEvent.getType() == StreamEvent.Type.EXPIRED ? streamEvent.getTimestamp() : (long) streamEvent.getOutputData()[2]); 
      if (streamEvent.getType() == StreamEvent.Type.EXPIRED){ 
       send(timestamp, null, events); 
      } else { 
       send(timestamp, events, null); 
      } 
     } 
    } 

    private void send(long timeStamp, Event[] currentEvents, Event[] expiredEvents) { 
     try { 
      receive(timeStamp, currentEvents, expiredEvents); 
     } catch (RuntimeException e) { 
      log.error("Error on sending events" + Arrays.deepToString(currentEvents) + ", " + Arrays.deepToString(expiredEvents), e); 
     } 
    } 

} 
1

这里所有的事件都是没有任何时间延迟地发送给Siddhi的,因此Siddhi一起处理所有这些事件。这就是为什么这些事件是以散装形式返回的。

如果您希望为每个事件调用回调,那么您必须扩展StreamCallback或QueryCallback,并迭代返回的事件数组并调用每个事件的回调。

+0

理想情况下,我希望能够重写send,但这是一种私有方法。 'receiveStreamEvent'是公开的,但有私人领域......有没有其他可能的钩子? – Johnny

+0

目前没有,但我们会将其添加到路线图中,以便用户可以禁用批量事件处理。 – suho

相关问题