2016-05-23 45 views
1

我有研究和阅读文件,他们不是很容易理解。 我想实现的是以下功能:弹簧反应堆项目的反应流背压

我正在使用Spring Reactor项目并使用eventBus。我的事件总线正在向模块A投掷事件。

模块A应接收事件并插入到将保存唯一值的热流中。每250个Milisecons溪流应拉动所有的价值,并在他们身上进行冥想......等等。

例如: 的eventBus抛出事件与数:1,2,3,2,3,2

的流应该得到和保持独特的价值观 - > 1,2,3 250之后毫秒该流应打印数字和空值

任何人有一个想法如何开始?我试过这些例子,但没有任何真正的工作,我想我不明白的东西。任何人都有一个例子?

TNX

编辑:

当试图做下我总是得到异常:

 Stream<List<Integer>> s = Streams.wrap(p).buffer(1, TimeUnit.SECONDS); 

     s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); 

     for (int i = 0; i < 10000; i++) { 
      p.onNext(i); 
     } 

例外:

java.lang.IllegalStateException: The environment has not been initialized yet 
    at reactor.Environment.get(Environment.java:156) ~[reactor-core-2.0.7.RELEASE.jar:?] 
    at reactor.Environment.timer(Environment.java:184) ~[reactor-core-2.0.7.RELEASE.jar:?] 
    at reactor.rx.Stream.getTimer(Stream.java:3052) ~[reactor-stream-2.0.7.RELEASE.jar:?] 
    at reactor.rx.Stream.buffer(Stream.java:2246) ~[reactor-stream-2.0.7.RELEASE.jar:?] 
    at com.ta.ng.server.controllers.user.UserController.getUsersByOrgId(UserController.java:70) ~[classes/:?] 

正如你可以看到我无法继续尝试而不通过这个问题。

BY THE WAY:发生这种情况只有当我使用buffer(1, TimeUnit.SECONDS)如果我使用buffer(50)例如它的工作..虽然这不是最终的解决方案,它的一个开始。

回答

0

再次阅读文档后,嗯,我错过了这一点:

static { 
     Environment.initialize(); 
    } 

这解决了这个问题。 Tnx