2016-10-20 50 views
1

试图调出Celery(与RabbitMQ)官方码头集装箱。socket.error:超时(Celery&RabbitMQ在码头集装箱中运行)

docker run -d --hostname my-rabbit --name some-rabbit rabbitmq 
docker run --link some-rabbit:rabbit --name some-celery -d celery 

我检查日志,以确保一切都很好:现在

# docker logs some-celery 

[2016-10-20 11:05:50,357: WARNING/MainProcess] /usr/local/lib/python3.5/site-packages/celery/apps/worker.py:161: CDeprecationWarning: 
Starting from version 3.2 Celery will refuse to accept pickle by default. 

The pickle serializer is a security concern as it may give attackers 
the ability to execute any command. It's important to secure 
your broker from unauthorized access when using pickle, so we think 
that enabling pickle should require a deliberate action and not be 
the default choice. 

If you depend on pickle then you should set a setting to disable this 
warning and to be sure that everything will continue working 
when you upgrade to Celery 3.2:: 

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] 

You must only enable the serializers that you will actually use. 


    warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED)) 
[2016-10-20 11:05:50,419: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbit:5672//: [Errno 111] Connection refused. 
Trying again in 2.00 seconds... 

[2016-10-20 11:05:52,430: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@rabbit:5672//: [Errno 111] Connection refused. 
Trying again in 4.00 seconds... 

[2016-10-20 11:05:57,611: WARNING/MainProcess] celery[email protected] ready. 

,在主机我使用tasks.py

from celery import Celery 

app = Celery('tasks', backend='amqp', broker='amqp://guest:[email protected]/') 

@app.task(name='tasks.add') 
def add(x, y): 
    return x + y 

,并从主机命令行我运行Python并得到:

>>> from tasks import add 
>>> res=add.delay(4,4) 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/Library/Python/2.7/site-packages/celery/app/task.py", line 461, in delay 
    return self.apply_async(args, kwargs) 
    File "/Library/Python/2.7/site-packages/celery/app/task.py", line 573, in apply_async 
    **dict(self._get_exec_options(), **options) 
    File "/Library/Python/2.7/site-packages/celery/app/base.py", line 354, in send_task 
    reply_to=reply_to or self.oid, **options 
    File "/Library/Python/2.7/site-packages/celery/app/amqp.py", line 310, in publish_task 
    **kwargs 
    File "/Library/Python/2.7/site-packages/kombu/messaging.py", line 172, in publish 
    routing_key, mandatory, immediate, exchange, declare) 
    File "/Library/Python/2.7/site-packages/kombu/connection.py", line 470, in _ensured 
    interval_max) 
    File "/Library/Python/2.7/site-packages/kombu/connection.py", line 382, in ensure_connection 
    interval_start, interval_step, interval_max, callback) 
    File "/Library/Python/2.7/site-packages/kombu/utils/__init__.py", line 246, in retry_over_time 
    return fun(*args, **kwargs) 
    File "/Library/Python/2.7/site-packages/kombu/connection.py", line 250, in connect 
    return self.connection 
    File "/Library/Python/2.7/site-packages/kombu/connection.py", line 756, in connection 
    self._connection = self._establish_connection() 
    File "/Library/Python/2.7/site-packages/kombu/connection.py", line 711, in _establish_connection 
    conn = self.transport.establish_connection() 
    File "/Library/Python/2.7/site-packages/kombu/transport/pyamqp.py", line 116, in establish_connection 
    conn = self.Connection(**opts) 
    File "/Library/Python/2.7/site-packages/amqp/connection.py", line 165, in __init__ 
    self.transport = self.Transport(host, connect_timeout, ssl) 
    File "/Library/Python/2.7/site-packages/amqp/connection.py", line 186, in Transport 
    return create_transport(host, connect_timeout, ssl) 
    File "/Library/Python/2.7/site-packages/amqp/transport.py", line 299, in create_transport 
    return TCPTransport(host, connect_timeout) 
    File "/Library/Python/2.7/site-packages/amqp/transport.py", line 95, in __init__ 
    raise socket.error(last_err) 
socket.error: timed out 

我看到这个Question但似乎错误是不同的有..

编辑: 我已经添加了RabbitMQ的容器日志(相关部分):

docker logs some-rabbit 


       RabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc. 
    ## ##  Licensed under the MPL. See http://www.rabbitmq.com/ 
    ## ## 
    ########## Logs: tty 
    ###### ##  tty 
    ########## 
       Starting broker... 

=INFO REPORT==== 20-Oct-2016::10:22:41 === 
Starting RabbitMQ 3.6.5 on Erlang 19.0.7 
Copyright (C) 2007-2016 Pivotal Software, Inc. 
Licensed under the MPL. See http://www.rabbitmq.com/ 

=INFO REPORT==== 20-Oct-2016::10:22:41 === 
node   : [email protected] 
home dir  : /var/lib/rabbitmq 
config file(s) : /etc/rabbitmq/rabbitmq.config 
cookie hash : AlUJAQFic5TGBPlUjyyIOw== 
log   : tty 
sasl log  : tty 
database dir : /var/lib/rabbitmq/mnesia/[email protected] 

=INFO REPORT==== 20-Oct-2016::10:22:42 === 
Memory limit set to 799MB of 1999MB total. 

=INFO REPORT==== 20-Oct-2016::10:22:42 === 
Disk free limit set to 50MB 

=INFO REPORT==== 20-Oct-2016::10:22:42 === 
Limiting to approx 1048476 file handles (943626 sockets) 

=INFO REPORT==== 20-Oct-2016::10:22:42 === 
FHC read buffering: OFF 
FHC write buffering: ON 

=INFO REPORT==== 20-Oct-2016::10:22:42 === 
Database directory at /var/lib/rabbitmq/mnesia/[email protected] is empty. Initialising from scratch... 

=INFO REPORT==== 20-Oct-2016::10:22:42 === 
    application: mnesia 
    exited: stopped 
    type: temporary 

=INFO REPORT==== 20-Oct-2016::10:22:43 === 
Priority queues enabled, real BQ is rabbit_variable_queue 

=INFO REPORT==== 20-Oct-2016::10:22:43 === 
Adding vhost '/' 

=INFO REPORT==== 20-Oct-2016::10:22:43 === 
Creating user 'guest' 

=INFO REPORT==== 20-Oct-2016::10:22:43 === 
Setting user tags for user 'guest' to [administrator] 

=INFO REPORT==== 20-Oct-2016::10:22:43 === 
Setting permissions for 'guest' in '/' to '.*', '.*', '.*' 

=INFO REPORT==== 20-Oct-2016::10:22:43 === 
msg_store_transient: using rabbit_msg_store_ets_index to provide index 

=INFO REPORT==== 20-Oct-2016::10:22:43 === 
msg_store_persistent: using rabbit_msg_store_ets_index to provide index 

=WARNING REPORT==== 20-Oct-2016::10:22:43 === 
msg_store_persistent: rebuilding indices from scratch 

=INFO REPORT==== 20-Oct-2016::10:22:43 === 
started TCP Listener on [::]:5672 
completed with 0 plugins. 

=INFO REPORT==== 20-Oct-2016::10:22:43 === 
Server startup complete; 0 plugins started. 

=INFO REPORT==== 20-Oct-2016::10:22:47 === 
accepting AMQP connection <0.353.0> (172.17.0.3:41166 -> 172.17.0.2:5672) 

=INFO REPORT==== 20-Oct-2016::10:22:47 === 
accepting AMQP connection <0.360.0> (172.17.0.3:41168 -> 172.17.0.2:5672) 
+0

你能直接访问rabbitmq服务器吗? – Jason

+0

@Jason我已经将rabbitmq日志添加到问题中了。 – ItayB

+0

我注意到您正在使用rabbit-mq作为后端。如果您从芹菜配置中删除'backend ='amqp',这仍然会发生吗?我想知道这个任务是否执行,但是它保存了对抛出异常的后端的响应。将日志记录到任务中会发生什么? – Jason

回答

1

固定。 我按照这个question。 我改变了tasks.py代理IP地址本地主机(因为RabbitMQ的端口暴露主办和行为像本地主机的过程):

from celery import Celery 

app = Celery('tasks', backend='amqp', broker='amqp://[email protected]') 

@app.task(name='tasks.add') 
def add(x, y): 
    return x + y 

然后,我运行的容器:

docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management 

docker run -v /Users/user/hostpath:/home/user --link some-rabbit:rabbit --name some-celery -d celery 

和将celeryconfig.py文件添加到/Users/user/hostpath

CELERY_IMPORTS = ('tasks') 
CELERY_IGNORE_RESULT = False 
CELERY_RESULT_BACKEND = 'amqp' 
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] 

,这对我有用:)

+1

很高兴你找到了解决方案! – Jason

+0

@Jason感谢您的支持;-) – ItayB