2012-03-28 27 views
2

我正在使用Apache HTTPClient 4以默认级别访问连接到Twitter的流式API。它工作得很好的开始,但是检索数据几分钟后捞出与此错误:使用apache httpclient增量处理twitter的流api?

2012-03-28 16:17:00,040 DEBUG org.apache.http.impl.conn.SingleClientConnManager: Get connection for route HttpRoute[{tls}->http://myproxy:80->https://stream.twitter.com:443] 
2012-03-28 16:17:00,040 WARN com.cloudera.flume.core.connector.DirectDriver: Exception in source: TestTwitterSource 
java.lang.IllegalStateException: Invalid use of SingleClientConnManager: connection still allocated. 
    at org.apache.http.impl.conn.SingleClientConnManager.getConnection(SingleClientConnManager.java:216) 
Make sure to release the connection before allocating another one. 
    at org.apache.http.impl.conn.SingleClientConnManager$1.getConnection(SingleClientConnManager.java:190) 

我明白为什么我面对这个问题。我正尝试在水槽集群中使用这个HttpClient作为水槽来源。代码如下所示:

public Event next() throws IOException, InterruptedException { 

    try { 

     HttpHost target = new HttpHost("stream.twitter.com", 443, "https"); 
     new BasicHttpContext(); 
     HttpPost httpPost = new HttpPost("/1/statuses/filter.json"); 
     StringEntity postEntity = new StringEntity("track=birthday", 
       "UTF-8"); 
     postEntity.setContentType("application/x-www-form-urlencoded"); 
     httpPost.setEntity(postEntity); 
     HttpResponse response = httpClient.execute(target, httpPost, 
       new BasicHttpContext()); 
     BufferedReader reader = new BufferedReader(new InputStreamReader(
       response.getEntity().getContent())); 
     String line = null; 
     StringBuffer buffer = new StringBuffer(); 
     while ((line = reader.readLine()) != null) { 
      buffer.append(line); 
      if(buffer.length()>30000) break; 
     } 
     return new EventImpl(buffer.toString().getBytes()); 
    } catch (IOException ie) { 
     throw ie; 
    } 

} 

我试图缓冲的响应流30,000个字符到一个StringBuffer,然后返回该所收到的数据。我显然没有关闭连接 - 但我不想关闭它,但我猜。 Twitter的开发者指南谈到这个here它读取:

Some HTTP client libraries only return the response body after the connection has been closed by the server. These clients will not work for accessing the Streaming API. You must use an HTTP client that will return response data incrementally. Most robust HTTP client libraries will provide this functionality. The Apache HttpClient will handle this use case, for example.

它清楚地告诉你的HttpClient将逐步返回响应数据。我已经通过了示例和教程,但是我没有发现任何接近这样做的事情。如果你们使用了httpclient(如果不是apache)并逐渐阅读twitter的streaming api,请告诉我你是如何实现这一壮举的。那些没有,请随时为答案作出贡献。 TIA。

UPDATE

我试图这样做:1)I移动获得流句柄水槽源的打开方法。 2)使用一个简单的Inpustream并将数据读入一个字节缓冲区。因此,这里是什么方法体貌似现在:

 byte[] buffer = new byte[30000]; 

     while (true) { 
      int count = instream.read(buffer); 
      if (count == -1) 
       continue; 
      else 
       break; 
     } 
     return new EventImpl(buffer); 

此作品在一定程度上 - 我得到的鸣叫,他们是很好的被写入到目标。问题出在instream.read(buffer)返回值。即使流中没有数据,并且缓冲区具有默认的\ u000000字节和其中的30,000个字节,也会将此值写入目标。所以目标文件看起来像这样..“tweets..tweets..tweeets .. \ u0000 \ u0000 \ u0000 u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u u d uw uw uw uw uw tweets。我知道计数不会返回-1,因为这是一个永不结束的流,所以如何判断缓冲区是否有来自读命令的新内容?

+0

您是否试图捕获#close方法抛出的I/O异常?我相应地更新了我的答案。 – oleg 2012-03-31 09:35:02

+0

另外,\ u0000 \ u0000 ... bytes/null字节不在流中 - 当我实例化一个带有30k个字符的缓冲区时,这些是默认字节,当流内容小于30k个字符时,剩下的字符是空字节。 – Jay 2012-04-03 08:38:39

回答

0

事实证明,这是一个水槽问题。 Flume经过优化以传输大小为32kb的事件。超过32kb的任何内容,Flume都会退出。 (解决方法是调整事件大小大于32KB)。所以,我已经改变了我的代码,至少缓存了20,000个字符。它有点作品,但它不是很好的证明。如果缓冲区长度超过32kb,这仍然可能会失败,但是,在一小时的测试中它仍然没有失败 - 我相信这与Twitter不会在其公共流上发送大量数据有关。

while ((line = reader.readLine()) != null) { 
      buffer.append(line); 
      if(buffer.length()>20000) break; 
     } 
0

问题是您的代码泄漏了连接。请确保无论您关闭内容流还是中止请求。

InputStream instream = response.getEntity().getContent(); 
    try { 
     BufferedReader reader = new BufferedReader(
       new InputStreamReader(instream)); 
     String line = null; 
     StringBuffer buffer = new StringBuffer(); 
     while ((line = reader.readLine()) != null) { 
      buffer.append(line); 
      if (buffer.length()>30000) { 
       httpPost.abort(); 
       // connection will not be re-used 
       break; 
      } 
     } 
     return new EventImpl(buffer.toString().getBytes()); 
    } finally { 
     // if request is not aborted the connection can be re-used 
     try { 
      instream.close(); 
     } catch (IOException ex) { 
      // log or ignore 
     } 
    } 
+0

nope。不工作。 Flume抱怨流已经关闭 - 甚至在开始任何处理之前都排除在外。 – Jay 2012-03-30 02:48:45

+0

#close()方法引发异常,可以安全地忽略。 – oleg 2012-03-30 06:48:52

相关问题