2016-11-08 98 views
2

我需要多次查询设备。每个查询都需要是异步的,并且设备一次不支持同时查询。 此外,一旦查询,不能立即再次查询。它需要至少1秒的暂停才能正常工作。为什么在完成连接之后不会调用完成?

我的两个查询,由saveClock()saveConfig()执行,返回一个Promise,并通过返回undefined来解决,如预期的那样。

在下面的代码中为什么要删除take()阻止toArray()被调用?
这里发生了什么,是否有更好的方法来实现相同的行为?

export const saveEpic = (action$, store) => 
    action$.ofType(SAVE) 
    .map(action => { 
     // access store and create object data 
     // ... 
     return data; 
    }) 
    .mergeMap(data => 
     Rx.Observable.from([ 
     Rx.Observable.of(data).mergeMap(data => saveClock(data.id, data.clock)), 
     Rx.Observable.timer(1000), 
     Rx.Observable.of(data).mergeMap(data => saveConfig(data.id, data.config)), 
     Rx.Observable.of(data.id) 
    ]) 
    ) 
    .concatAll() 
    .take(4) 
    .toArray() 
    // [undefined, 0, undefined, "id"] 
    .map(x => { type: COMPLETED, id: x[3] }); 
+1

为什么要移除'take()'防止'toArray()'被调用?那么这个可观察的事物何时完成? 'toArray()'只适用于已完成的观测值,不能返回具有不完整流的值。因此,如果流没有固有的结束(即,如果它被绑定到一个事件监听器等等),那么'toArray'永远不会被调用。但是,当4个项目被发射时,'take()'使流“完成”。 – aaronofleonard

回答

2

有一对夫妇的事情,我看到:

你最终.map()缺少括号,这在目前的形式是一个语法错误,但一个微妙的变化可以使它的偶然一个labeled statement而不是返回一个对象。因为在目前的形式下它是一个语法错误,我想这只是这篇文章中的一个错误,而不是你的代码(它甚至不会运行),但仔细检查!

// before 
.map(x => { type: COMPLETED, id: x[3] }); 

// after 
.map(x => ({ type: COMPLETED, id: x[3] })); 

随着该固定的例子并用一个简单的终极版,可观察运行测试用例:http://jsbin.com/hunale/edit?js,output因此,如果没有什么显着的我做了不同于你,问题似乎是在代码中没有提供。随意添加更多的洞察力,甚至更好,在我们的JSBin/git回购中重现它。你没有提到,但是是非常非常值得一提的


一件事是,在终极版,可观察到的,你的史诗通常是长期存在的“过程管理”。这个史诗将实际上只处理其中一个保存,然后完成(),这可能不是你真正想要的?用户每次启动应用程序只能保存一次吗?似乎不太可能。

取而代之的是,您需要通过将此逻辑封装在mergeMap之内来保持顶级流式史诗般的返回活动并监听未来的操作。该take(4)并传递data.id则成为多余:

const saveEpic = (action$, store) => 
    action$.ofType(SAVE) 
    .mergeMap(data => 
     Rx.Observable.from([ 
     Rx.Observable.of(data).mergeMap(data => saveClock(data.id, data.clock)), 
     Rx.Observable.timer(1000), 
     Rx.Observable.of(data).mergeMap(data => saveConfig(data.id, data.config)) 
     ]) 
     .concatAll() 
     .toArray() 
     .map(() => ({ type: COMPLETED, id: data.id })) 
    ); 

流的这种分离是由本·莱什在他最近AngularConnect会谈所描述的,在错误的情况下,但它仍然适用:https://youtu.be/3LKMwkuK0ZE?t=20m(不用担心,这是不是角具体)

接下来,我想分享一些不请自来的重构建议,可以让你的生活更容易,但可以肯定这是自以为是可以随意忽略:

我会重构更准确地反映视觉上的事件顺序,以及降低复杂性:

const saveEpic = (action$, store) => 
    action$.ofType(SAVE) 
    .mergeMap(data => 
     Rx.Observable.from(saveClock(data.id, data.clock)) 
     .delay(1000) 
     .mergeMap(() => saveConfig(data.id, data.config)) 
     .map(() => ({ type: COMPLETED, id: data.id })) 
    ); 

在这里,我们就吃下由saveClock返回的承诺,延缓它是1000毫秒输出,该mergeMapping结果向saveConfig()呼叫也返回将被消耗的承诺。然后最终将结果映射到我们的COMPLETE操作。

最后,请记住,如果你的史诗确实活着,并长期生活,没有什么在这个史诗般的,是从接收多个SAVE请求停止,而其他的人仍然在飞行中还是有不但耗尽了请求之间所需的1000ms延迟。即如果确实需要任何请求之间的1000ms空间,那么您的史诗本身并不会完全阻止您的UI代码破坏它。在这种情况下,您可能需要考虑添加更复杂的缓冲机制,例如使用.zip()运算符和BehaviorSubject

http://jsbin.com/waqipol/edit?js,output

const saveEpic = (action$, store) => { 
    // used to control how many we want to take, 
    // the rest will be buffered by .zip() 
    const requestCount$ = new Rx.BehaviorSubject(1) 
    .mergeMap(count => new Array(count)); 

    return action$.ofType(SAVE) 
    .zip(requestCount$, action => action) 
    .mergeMap(data => 
     Rx.Observable.from(saveClock(data.id, data.clock)) 
     .delay(1000) 
     .mergeMap(() => saveConfig(data.id, data.config)) 
     .map(() => ({ type: COMPLETED, id: data.id })) 
     // we're ready to take the next one, when available 
     .do(() => requestCount$.next(1)) 
    ); 
}; 

这使得它,这样的请求,以保存进来,而我们仍在处理现有的缓冲,而我们只需要其中的一个在一个时间。请记住,这是一个无限制的缓冲区 - 意味着挂起的动作队列可能比缓冲区刷新快得多。这是不可避免的,除非你采用了有损背压的策略,例如丢弃重叠的请求等。

如果您有其他史诗需要重复发送请求超过一秒不超过一次,您需要创建一些排序的单一监督人,为所有史诗般的保证。

这似乎都非常复杂,但也许具有讽刺意味的是,这是很多 RxJS比传统命令代码更容易做到。最难的部分实际上是知道模式。

+0

对不起括号,只是我在问题代码中的一个错误。不过非常感谢您指出了多笔储蓄的问题。虽然我没有想过一个长寿的史诗,但我确实阻止了用户调度多个“SAVE”操作。一旦分配了'SAVE'动作,UI将不允许再次保存,直到分派'COMPLETE'动作。尽管如此,我仍然觉得背压机制非常有趣,而且双重检查将非常有用。幸运的是,我没有其他史诗来查询设备,因此我不需要任何机制来防止这种情况。 –

+0

保持史诗般的生命似乎解决了我的问题。谢谢! –

+0

不客气! – jayphelps

相关问题