编辑: 你基本上是在寻找一个阻塞运算符。旧的阻止操作符(如ForEach
)已被弃用,以支持异步版本。你要等待的最后一个项目,像这样:
public async Task TestMethod1()
{
TestContext.WriteLine("Starting test...");
var observable = Observable.Create<int>(async ob =>
{
ob.OnNext(1);
await Task.Delay(1000);
ob.OnNext(2);
await Task.Delay(1000);
ob.OnNext(3);
ob.OnCompleted();
});
observable.Subscribe(i => TestContext.WriteLine($"Sync {i}"));
var selectManyObservable = observable.SelectMany(i => WriteAsync(i).ToObservable()).Publish().RefCount();
selectManyObservable.Subscribe();
await selectManyObservable.LastOrDefaultAsync();
TestContext.WriteLine("Complete.");
}
虽然这将解决您的眼前的问题,它看起来像你要保持运行到由于下面的问题(我增加了两个)。 Rx在正确使用时功能非常强大,如果没有使用,就会让人困惑。
老答案:
有两件事情:
- 。混合异步/等待和Rx通常会导致越来越两者的缺陷和没有好处。
- Rx具有强大的测试功能。你没有使用它。
- 副作用,如
WriteLine
最好在订阅中完全执行,而不是像SelectMany
这样的操作员执行。
- 你可能想要冷却vs热的可观察物。
它没有跑完成的原因是因为你的测试跑步者。您的测试运行人员将在TestMethod1
结束时终止测试。 Rx认购将以其他方式生活。当我在Linqpad运行你的代码,我得到以下的输出:
开始测试...
同步1
同步2
异步1
同步3
异步2
完成。
异步3
...这是我假设你想看到的,除了你可能想在完成异步3
后使用仅Rx,您的代码会是这个样子:
public void TestMethod1()
{
TestContext.WriteLine("Starting test...");
var observable = Observable.Concat<int>(
Observable.Return(1),
Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)),
Observable.Return(2),
Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)),
Observable.Return(3)
);
var syncOutput = observable
.Select(i => $"Sync {i}");
syncOutput.Subscribe(s => TestContext.WriteLine(s));
var asyncOutput = observable
.SelectMany(i => WriteAsync(i, scheduler));
asyncOutput.Subscribe(s => TestContext.WriteLine(s),() => TestContext.WriteLine("Complete."));
}
public IObservable<string> WriteAsync(int value, IScheduler scheduler)
{
return Observable.Return(value)
.Delay(TimeSpan.FromSeconds(1), scheduler)
.Select(i => $"Async {value}");
}
public static class TestContext
{
public static void WriteLine(string s)
{
Console.WriteLine(s);
}
}
这仍然没有利用Rx的测试功能。这看起来像这样:
public void TestMethod1()
{
var scheduler = new TestScheduler();
TestContext.WriteLine("Starting test...");
var observable = Observable.Concat<int>(
Observable.Return(1),
Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1), scheduler),
Observable.Return(2),
Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1), scheduler),
Observable.Return(3)
);
var syncOutput = observable
.Select(i => $"Sync {i}");
syncOutput.Subscribe(s => TestContext.WriteLine(s));
var asyncOutput = observable
.SelectMany(i => WriteAsync(i, scheduler));
asyncOutput.Subscribe(s => TestContext.WriteLine(s),() => TestContext.WriteLine("Complete."));
var asyncExpected = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(1000.Ms(), "Async 1"),
ReactiveTest.OnNext(2000.Ms(), "Async 2"),
ReactiveTest.OnNext(3000.Ms(), "Async 3"),
ReactiveTest.OnCompleted<string>(3000.Ms() + 1) //+1 because you can't have two notifications on same tick
);
var syncExpected = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(0000.Ms(), "Sync 1"),
ReactiveTest.OnNext(1000.Ms(), "Sync 2"),
ReactiveTest.OnNext(2000.Ms(), "Sync 3"),
ReactiveTest.OnCompleted<string>(2000.Ms()) //why no +1 here?
);
var asyncObserver = scheduler.CreateObserver<string>();
asyncOutput.Subscribe(asyncObserver);
var syncObserver = scheduler.CreateObserver<string>();
syncOutput.Subscribe(syncObserver);
scheduler.Start();
ReactiveAssert.AreElementsEqual(
asyncExpected.Messages,
asyncObserver.Messages);
ReactiveAssert.AreElementsEqual(
syncExpected.Messages,
syncObserver.Messages);
}
public static class MyExtensions
{
public static long Ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms).Ticks;
}
}
...所以不像你的任务测试,你不必等待。测试立即执行。您可以将延迟时间提高到几分钟或几小时,TestScheduler
将为您嘲笑时间。然后你的测试运动员可能会很高兴。
你知道你正在创建三个独立的订阅到底层observable吗?这是你的意图吗?还是你想分享三个观察者中单个观察值的值? – Enigmativity
3个订阅? '等待可观察'是否也会导致订阅?在真正的代码中,原始的可观察性很热,所以我可能应该在示例中使它变得很热门...... –
是的,“await observable”确实会导致第三次订阅。您的第二次订阅“observable.SelectMany(i => WriteAsync(i).ToObservable())。Subscribe()'有一个延迟,所以'await observable'在它之前完成。这就是为什么你错过了“Async 3”。 – Enigmativity