我学习无扩展,我一直在试图找出如果它是这样的任务的匹配一批请求。处理与无扩展
我有加工成批请求作为一个工作单元,并调用回调当所有请求已经完成的进程()方法。
这里最重要的是,每个请求将调用回调函数同步或异步取决于它的实现,并批处理器必须能够同时处理。
但没有线程正在从批处理器开始,任何新的螺纹(或其它异步执行)将被从请求处理程序内如果有必要启动。我不知道这是否符合rx的用例。
我当前工作的代码看起来(几乎)是这样的:
public void Process(ICollection<IRequest> requests, Action<List<IResponse>> onCompleted)
{
IUnitOfWork uow = null;
try
{
uow = unitOfWorkFactory.Create();
var responses = new List<IResponse>();
var outstandingRequests = requests.Count;
foreach (var request in requests)
{
var correlationId = request.CorrelationId;
Action<IResponse> requestCallback = response =>
{
response.CorrelationId = correlationId;
responses.Add(response);
outstandingRequests--;
if (outstandingRequests != 0)
return;
uow.Commit();
onCompleted(responses);
};
requestProcessor.Process(request, requestCallback);
}
}
catch(Exception)
{
if (uow != null)
uow.Rollback();
}
if (uow != null)
uow.Commit();
}
你将如何实现这一点使用RX?这是合理的吗?
注意,这项工作的单位将被同步提交即使有还未返回异步请求。
这的确非常有帮助!感谢您提供非常详尽的答案。唯一让我无法理解的是,当所有请求都得到回应时,我该如何“做些什么”?在zip之后我该怎么说:现在所有的请求都被响应,用这些请求/响应对调用这些方法......我尝试订阅“rqrps”observable,在列表中收集响应并调用方法完成后,但没有成功(在所有这些之后,收集列表中的响应似乎有点奇怪)。我真的很感激:) – asgerhallas
doh! ......称为错误的超载:) – asgerhallas
@asgerhallas - 你不需要等待所有的对子回来 - 只要在他们回来时开始处理它们。否则看看'.ToArray()'可观察的运算符 - 它将'IObservable'变成'IObservable ' - 将n个值的可观察值转换为具有n个元素的一个数组的可观察值,并且只有在成功时。听起来像是您需要的完美运营商。 –
Enigmativity