2017-08-26 127 views
0

说我有一个观察到的,看起来像这样(这是Python,但应该是通用于所有语言):配料结果可观察

rx.Observable.from_iterable([[1],[],[2],[3],[],[],[4],[5,6],[7],[8,9],[10]])

我希望最终能批量整数到长度为5的列表和能够传递到那些功能,所以这样的事情:

batch_function([1,2,3,4,5]) 
batch_function([6,7,8,9,10]) 

在现实中,输入的数据将是(可能是空的)列表的一个inifinite流。我只想确保在我累积了5个实际值之前,我的后续调用batch_function不会完成。谢谢你的帮助。

+0

您是否需要最后一个5元素列表流?只要将观察者订阅到流中并在观察者中手动缓冲,只要缓冲区超过5个元素就调用'batch_function'就足够了吗? – concat

+0

@concat我刚刚发布了使用'buffer_with_count'的工作。我需要该列表,因为将批次发送到下游功能更有效。 – rumdrums

回答

0

以下代码段正在为我使用buffer_with_count。我不确定是否有更简洁的方法来做到这一点,即没有flat_map

BUFFER_COUNT=5 
rx.Observable.from_iterable(iter(get_items())) \ 
    .flat_map(lambda it: it) \ 
    .buffer_with_count(BUFFER_COUNT) \ 
    .map(lambda my_array: do_something_with(my_array)) \ 
    .subscribe(lambda it: print(it))