2017-07-06 63 views
3

我正在使用Kafka和Spark Streaming构建应用程序。输入数据来自第三部分流媒体,并发布在kafka话题上。这段代码显示了流代理模块:这是我从流的结果和方式如何我送他们到KafkaPublisher(它显示只是一个草图):使用Kafka的应用程序打开错误太多文件

def on_result_response(self,*args): 
    self.kafkaPublisher.pushMessage(str(args[0])) 

的KafkaPublisher实现这两种方法:

class KafkaPublisher: 

def __init__(self,address,port,topic): 
    self.kafka = KafkaClient(str(address)+":"+str(port)) 
    self.producer = SimpleProducer(self.kafka) 
    self.topic=topic 



def pushMessage(self,message): 
    self.producer.send_messages(self.topic, message) 
    self.producer = SimpleProducer(self.kafka, async=True) 

和应用程序通过这个主要推出:

from StreamProxy import StreamProxy 


streamProxy=StreamProxy("localhost",9092,"task1") 
streamProxy.getStreaming(20) #seconds of streaming 

一些批量处理后(10秒或多或少)它的推出以下个例外

Exception in thread Thread-2354: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner File "/usr/lib/python2.7/threading.py", line 754, in run File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py", line 164, in _send_upstream File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 649, in send_produce_request File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 253, in _send_broker_aware_request File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 74, in _get_conn File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 236, in connect error: [Errno 24] Too many open files

Exception in thread Thread-2355: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner File "/usr/lib/python2.7/threading.py", line 754, in run File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py", line 164, in _send_upstream File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 649, in send_produce_request File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 253, in _send_broker_aware_request File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 74, in _get_conn File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 236, in connect error: [Errno 24] Too many open files

请注意,有许多不同的异常有相同的消息,肯定问题是发行方。

+0

是'StreamProxy'东西,你写的吗?如果是这样,你可以将源代码链接到'getStreaming'? –

回答

2

尝试删除该行:

self.producer = SimpleProducer(self.kafka, async=True) 
+1

谢谢你,完美的作品。但为什么? – RamsesXVII

相关问题