有一对夫妇的事情,我看到:
你最终.map()
缺少括号,这在目前的形式是一个语法错误,但一个微妙的变化可以使它的偶然一个labeled statement而不是返回一个对象。因为在目前的形式下它是一个语法错误,我想这只是这篇文章中的一个错误,而不是你的代码(它甚至不会运行),但仔细检查!
// before
.map(x => { type: COMPLETED, id: x[3] });
// after
.map(x => ({ type: COMPLETED, id: x[3] }));
随着该固定的例子并用一个简单的终极版,可观察运行测试用例:http://jsbin.com/hunale/edit?js,output因此,如果没有什么显着的我做了不同于你,问题似乎是在代码中没有提供。随意添加更多的洞察力,甚至更好,在我们的JSBin/git回购中重现它。你没有提到,但是是非常非常值得一提的
一件事是,在终极版,可观察到的,你的史诗通常是长期存在的“过程管理”。这个史诗将实际上只处理其中一个保存,然后完成(),这可能不是你真正想要的?用户每次启动应用程序只能保存一次吗?似乎不太可能。
取而代之的是,您需要通过将此逻辑封装在mergeMap
之内来保持顶级流式史诗般的返回活动并监听未来的操作。该take(4)
并传递data.id
则成为多余:
const saveEpic = (action$, store) =>
action$.ofType(SAVE)
.mergeMap(data =>
Rx.Observable.from([
Rx.Observable.of(data).mergeMap(data => saveClock(data.id, data.clock)),
Rx.Observable.timer(1000),
Rx.Observable.of(data).mergeMap(data => saveConfig(data.id, data.config))
])
.concatAll()
.toArray()
.map(() => ({ type: COMPLETED, id: data.id }))
);
流的这种分离是由本·莱什在他最近AngularConnect会谈所描述的,在错误的情况下,但它仍然适用:https://youtu.be/3LKMwkuK0ZE?t=20m(不用担心,这是不是角具体)
接下来,我想分享一些不请自来的重构建议,可以让你的生活更容易,但可以肯定这是自以为是可以随意忽略:
我会重构更准确地反映视觉上的事件顺序,以及降低复杂性:
const saveEpic = (action$, store) =>
action$.ofType(SAVE)
.mergeMap(data =>
Rx.Observable.from(saveClock(data.id, data.clock))
.delay(1000)
.mergeMap(() => saveConfig(data.id, data.config))
.map(() => ({ type: COMPLETED, id: data.id }))
);
在这里,我们就吃下由saveClock
返回的承诺,延缓它是1000毫秒输出,该mergeMapping结果向saveConfig()
呼叫也返回将被消耗的承诺。然后最终将结果映射到我们的COMPLETE
操作。
最后,请记住,如果你的史诗确实活着,并长期生活,没有什么在这个史诗般的,是从接收多个SAVE请求停止,而其他的人仍然在飞行中还是有不但耗尽了请求之间所需的1000ms延迟。即如果确实需要任何请求之间的1000ms空间,那么您的史诗本身并不会完全阻止您的UI代码破坏它。在这种情况下,您可能需要考虑添加更复杂的缓冲机制,例如使用.zip()
运算符和BehaviorSubject
。
http://jsbin.com/waqipol/edit?js,output
const saveEpic = (action$, store) => {
// used to control how many we want to take,
// the rest will be buffered by .zip()
const requestCount$ = new Rx.BehaviorSubject(1)
.mergeMap(count => new Array(count));
return action$.ofType(SAVE)
.zip(requestCount$, action => action)
.mergeMap(data =>
Rx.Observable.from(saveClock(data.id, data.clock))
.delay(1000)
.mergeMap(() => saveConfig(data.id, data.config))
.map(() => ({ type: COMPLETED, id: data.id }))
// we're ready to take the next one, when available
.do(() => requestCount$.next(1))
);
};
这使得它,这样的请求,以保存进来,而我们仍在处理现有的缓冲,而我们只需要其中的一个在一个时间。请记住,这是一个无限制的缓冲区 - 意味着挂起的动作队列可能比缓冲区刷新快得多。这是不可避免的,除非你采用了有损背压的策略,例如丢弃重叠的请求等。
如果您有其他史诗需要重复发送请求超过一秒不超过一次,您需要创建一些排序的单一监督人,为所有史诗般的保证。
这似乎都非常复杂,但也许具有讽刺意味的是,这是很多 RxJS比传统命令代码更容易做到。最难的部分实际上是知道模式。
为什么要移除'take()'防止'toArray()'被调用?那么这个可观察的事物何时完成? 'toArray()'只适用于已完成的观测值,不能返回具有不完整流的值。因此,如果流没有固有的结束(即,如果它被绑定到一个事件监听器等等),那么'toArray'永远不会被调用。但是,当4个项目被发射时,'take()'使流“完成”。 – aaronofleonard