2012-03-22 108 views
1

我有一个从网上下载csv文件的功能。我需要扩展它,以便对2个网站依次进行2个呼叫。但是我不知道该怎么办呢?Rx Observable - 链接

继承人的功能:

// Define other methods and classes here 
private void GetCSVData(string url1, string url2) 
{ 
    WebClient webClient = null; 
    try 
    { 
     webClient = new WebClient(); 

     var task = Observable.FromEventPattern 
      <OpenReadCompletedEventHandler, OpenReadCompletedEventArgs> 
     (
      ev => webClient.OpenReadCompleted += ev, 
      ev => webClient.OpenReadCompleted -= ev 
     ); 

     // needs to be redone 
     task.Subscribe(t => ParseCSV1(t.EventArgs.Result)); 

     // call ParseCSV1() 
     // then call ParseCSV2() 

     // needs redone, 2 calls to 2 website 
     webClient.OpenReadAsync(new Uri(url1));  
    } 
    catch (WebException wex) 
    { 
     System.Diagnostics.Debug.WriteLine(wex.ToString()); 
    } 
    catch (Exception ex) 
    { 
     System.Diagnostics.Debug.WriteLine(ex.ToString()); 
    } 
} 

private void ParseCSV1(Stream stream) 
{ 
    // Parse steps... 
} 

private void ParseCSV2(Stream stream) 
{ 
    // Parse steps... 
} 
+1

您的代码似乎不完整。你有'url1'&'url2'进来,但你用'url'调用'OpenReadAsync'。你的代码目的不清楚。你能改善这个问题吗? – Enigmativity 2012-03-23 00:12:03

+1

将2个网址映射到2个可观察对象,将这2个可观察对象连接以获得单个可观察对象并订阅它 – Ankur 2012-03-25 12:38:58

回答

1

你有什么应该是好的。如果您再次致电OpenReadAsync,那么第二个事件将触发(每个URL一个),您的订阅将处理这两个事件。

您应该简单地能够做到以下几点:

// Call both, will generate two events. 
webClient.OpenReadAsync(new Uri(url1)); 
webClient.OpenReadAsync(new Uri(url2)); 

不过,我怀疑你有一些生命周期的问题,因为你走出去的范围与WebClient(它从来没有妥善处置)。

0

我......在这里可能不会使用Rx。实际上,这听起来更适合任务库;这里是我可能会做的事情:

void Main() 
{ 
    var webClient = new System.Net.WebClient(); 

    Task.Factory 
     .StartNew(() => GetUrl("http://www.google.com")) 
     .ContinueWith(tsk => ParseCsv(CheckAndHandleError(tsk))) 
     .ContinueWith(tsk => 
      { 
       if(CheckAndHandleError(tsk)) 
        return GetUrl("http://www.yahoo.com"); 
       else 
        return string.Empty; 
      }) 
     .ContinueWith(tsk => ParseCsv(CheckAndHandleError(tsk))); 

} 

private T CheckAndHandleError<T>(Task<T> task) 
{ 
    if(task.IsFaulted) 
    { 
     // you'll need to handle the tsk.Exception here 
     throw new NotImplementedException(); 
    } 
    else 
    { 
     return task.Result; 
    } 
} 

private string GetUrl(string address) 
{ 
    using(var client = new WebClient()) 
    { 
     var stream = client.OpenRead(address); 
     using(var rdr = new StreamReader(stream)) 
     { 
      return rdr.ReadToEnd(); 
     } 
    } 
} 

private bool ParseCsv(string csvText) 
{ 
    // whatever this is 
    Console.WriteLine(csvText); 
    return true; 
} 
相关问题