2017-05-15 34 views
0

我使用kafka python客户端将消息推送到Message Hub,但注意到经过一段时间运行我的应用程序后,它将停止向Message Hub发送消息。kafka python - Bluemix MessageHub - ConnectionError:套接字已断开

然后我注意到我的日志文件如下:

ConnectionError: socket disconnected 

我更新了我的代码中加入retries=5

from kafka import KafkaProducer 
from kafka.errors import KafkaError 
import ssl 

sasl_mechanism = 'PLAIN' 
security_protocol = 'SASL_SSL' 

# Create a new context using system defaults, disable all but TLS1.2 
context = ssl.create_default_context() 
context.options &= ssl.OP_NO_TLSv1 
context.options &= ssl.OP_NO_TLSv1_1 

producer = KafkaProducer(bootstrap_servers = app.config['KAFKA_BROKERS_SASL'], 
         sasl_plain_username = app.config['KAFKA_USERNAME'], 
         sasl_plain_password = app.config['KAFKA_PASSWORD'], 
         security_protocol = security_protocol, 
         ssl_context = context, 
         sasl_mechanism = sasl_mechanism, 
         api_version = (0,10), 
         retries=5) 

def send_message(message): 

    try: 
     producer.send(app.config['KAFKA_TOPIC'], message.encode('utf-8')) 

     # FIXME sending seems to be unreliable unless we flush 
     producer.flush() 
    except: 
     print("Unexpected error:", sys.exc_info()[0]) 
     raise 

我的代码从一个Python应用程序烧瓶运行。每一种进入某种类型的请求都会调用send_message()方法。

以下是Bluemix的相关日志行。我可能已经错过了一两个复制和粘贴,但希望它是足够弄清楚发生了什么事:

APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: Authenticated as xxxxx May 15, 2017 8:50:54 PM 
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: socket disconnected May 15, 2017 8:50:54 PM 
APP/1 Got error produce response on topic-partition TopicPartition(topic='movie_ratings', partition=0), retrying (4 attempts left). Error: ConnectionError: socket disconnectedMay 15, 2017 8:50:54 PM 
APP/1 Node 0 connection failed -- refreshing metadata May 15, 2017 8:50:54 PM 
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: Authenticated as xxxxxMay 15, 2017 8:50:54 PM 
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: socket disconnected May 15, 2017 8:50:54 PM 
APP/1 Node 0 connection failed -- refreshing metadata May 15, 2017 8:50:54 PM 
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: Authenticated as xxxxx May 15, 2017 8:50:54 PM 
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: socket disconnected May 15, 2017 8:50:55 PM 
APP/1 Node 0 connection failed -- refreshing metadata May 15, 2017 8:50:55 PM 
APP/1 Got error produce response on topic-partition TopicPartition(topic='movie_ratings', partition=0), retrying (2 attempts left). Error: ConnectionError: socket disconnected May 15, 2017 8:50:55 PM 
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: Authenticated as xxxxx May 15, 2017 8:50:55 PM 
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: socket disconnected May 15, 2017 8:50:55 PM 
APP/1 Node 0 connection failed -- refreshing metadata May 15, 2017 8:50:55 PM 
APP/1 Got error produce response on topic-partition TopicPartition(topic='movie_ratings', partition=0), retrying (1 attempts left). Error: ConnectionError: socket disconnected May 15, 2017 8:50:55 PM 
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: Authenticated as xxxxx May 15, 2017 8:50:55 PM 
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: socket disconnected May 15, 2017 8:50:55 PM 
APP/1 Got error produce response on topic-partition TopicPartition(topic='movie_ratings', partition=0), retrying (0 attempts left). Error: ConnectionError: socket disconnected May 15, 2017 8:50:55 PM 
APP/1 Node 0 connection failed -- refreshing metadata May 15, 2017 8:50:55 PM 
APP/1 <BrokerConnection host=kafka01-prod01.messagehub.services.us-south.bluemix.net/23.246.202.51 port=9093>: Authenticated as xxxxx May 15, 2017 8:50:55 PM 
APP/1 Node 0 connection failed -- refreshing metadata May 15, 2017 8:50:56 PM 
APP/1 ConnectionError: socket disconnected May 15, 2017 8:50:56 PM 
APP/1 Unable to import 'sasl'. Fallback to 'puresasl'. May 15, 2017 8:51:00 PM 
APP/1 Closing active operation May 15, 2017 8:51:01 PM 

我演讲的题目存在。我完整的客户端代码是在这里:https://github.com/snowch/movie-recommender-demo/blob/effc981cc9f799c41952719619f693172eebcd6a/web_app/app/messagehub_client.py

最欣赏的任何指针...

+0

@ chris.snow将你的Kafka python从1.3.1升级到1.3.3等更新的东西。他们在PR https://github.com/dpkp/kafka-python/pull/1003中修正了一个重大的sasl错误,这导致了我相信。 –

回答

0

Dominic's评论,升级卡夫卡蟒蛇1.3.3解决了该问题对我来说。