更新:代码在Python 3.4.4上按预期运行,请参阅下面的注释。 @kwarunek,当你提到你最后一次评论关于ioloop继续运行时,我没有完全理解它,因为我的代码工作,杀死进程发送一个取消所有被唤醒的任务。但是,现在我明白了你的观点,因为取消任务不是用3.4.4触发的,3.4.2很好。
21:28:09,004 [59441] [MainThread:supervisor] CRITICAL failed to pull stats
<killing process>
21:28:11,826 [59441] [MainThread:supervisor] INFO starting while loop
21:28:11,827 [59441] [MainThread:supervisor] INFO launch the delegating coroutine
21:28:11,828 [59441] [MainThread:shutdown] INFO received stop signal
21:28:11,828 [59441] [MainThread:shutdown] INFO <Task finished coro=<pull_stats() done, defined at /opt/blue-python/3.4/lib/python3.4/site-packages/haproxystats/pull.py:124> result=False>
21:28:11,829 [59441] [MainThread:shutdown] INFO cancelling task
21:28:11,829 [59441] [MainThread:supervisor] INFO delegating coroutine finished
21:28:11,829 [59441] [MainThread:supervisor] CRITICAL failed to pull stats
21:28:21,009 [59441] [MainThread:supervisor] INFO starting while loop
21:28:21,010 [59441] [MainThread:supervisor] INFO launch the delegating coroutine
21:28:21,011 [59441] [MainThread:supervisor] INFO delegating coroutine finished
2016-01-30 21:28:21,011 [59441] [MainThread:supervisor] CRITICAL failed to pull stats
而在python 3.4.2
21:23:51,015 [10219] [MainThread:supervisor] CRITICAL failed to pull stats
<killing process>
21:23:55,737 [10219] [MainThread:supervisor] INFO starting while loop
21:23:55,737 [10219] [MainThread:supervisor] INFO launch the delegating coroutine
21:23:55,740 [10219] [MainThread:shutdown] INFO received stop signal
21:23:55,740 [10219] [MainThread:shutdown] INFO <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')>
21:23:55,740 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,740 [10219] [MainThread:shutdown] INFO <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')>
21:23:55,741 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,741 [10219] [MainThread:shutdown] INFO <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')>
21:23:55,741 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,741 [10219] [MainThread:shutdown] INFO <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')>
21:23:55,741 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,741 [10219] [MainThread:shutdown] INFO <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')>
21:23:55,741 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,742 [10219] [MainThread:shutdown] INFO <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')>
21:23:55,742 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,742 [10219] [MainThread:shutdown] INFO <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')>
21:23:55,742 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,742 [10219] [MainThread:shutdown] INFO <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')>
21:23:55,742 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,742 [10219] [MainThread:shutdown] INFO <Task finished coro=<pull_stats() done, defined at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:124> result=False>
21:23:55,743 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,743 [10219] [MainThread:shutdown] INFO <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(0)() at /usr/lib/python3.4/asyncio/tasks.py:582]>
21:23:55,743 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,743 [10219] [MainThread:shutdown] INFO <Task finished coro=<pull_stats() done, defined at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:124> result=False>
21:23:55,744 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,744 [10219] [MainThread:shutdown] INFO <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(7)() at /usr/lib/python3.4/asyncio/tasks.py:582]>
21:23:55,744 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,744 [10219] [MainThread:shutdown] INFO <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(4)() at /usr/lib/python3.4/asyncio/tasks.py:582]>
21:23:55,745 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,745 [10219] [MainThread:shutdown] INFO <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(5)() at /usr/lib/python3.4/asyncio/tasks.py:582]>
21:23:55,745 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,745 [10219] [MainThread:shutdown] INFO <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/lib/python3.4/asyncio/tasks.py:334]>
21:23:55,746 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,746 [10219] [MainThread:shutdown] INFO <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(3)() at /usr/lib/python3.4/asyncio/tasks.py:582]>
21:23:55,746 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,746 [10219] [MainThread:shutdown] INFO <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/lib/python3.4/asyncio/tasks.py:334]>
21:23:55,747 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,747 [10219] [MainThread:shutdown] INFO <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(6)() at /usr/lib/python3.4/asyncio/tasks.py:582]>
21:23:55,747 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,747 [10219] [MainThread:shutdown] INFO <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/lib/python3.4/asyncio/tasks.py:334]>
21:23:55,747 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,747 [10219] [MainThread:shutdown] INFO <Task pending coro=<pull_stats() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:150> wait_for=<_GatheringFuture pending cb=[Task._wakeup()]> cb=[_raise_stop_error() at /usr/lib/python3.4/asyncio/base_events.py:101]>
21:23:55,748 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,748 [10219] [MainThread:shutdown] INFO <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(2)() at /usr/lib/python3.4/asyncio/tasks.py:582]>
21:23:55,748 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,748 [10219] [MainThread:shutdown] INFO <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]>
21:23:55,748 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,749 [10219] [MainThread:shutdown] INFO <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')>
21:23:55,749 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,749 [10219] [MainThread:shutdown] INFO <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')>
21:23:55,749 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,749 [10219] [MainThread:shutdown] INFO <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]>
21:23:55,750 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,750 [10219] [MainThread:shutdown] INFO <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')>
21:23:55,750 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,750 [10219] [MainThread:shutdown] INFO <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')>
21:23:55,750 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,751 [10219] [MainThread:shutdown] INFO <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]>
21:23:55,751 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,751 [10219] [MainThread:shutdown] INFO <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')>
21:23:55,751 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,751 [10219] [MainThread:shutdown] INFO <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')>
21:23:55,751 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,752 [10219] [MainThread:shutdown] INFO <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]>
21:23:55,752 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,752 [10219] [MainThread:shutdown] INFO <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')>
21:23:55,752 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,752 [10219] [MainThread:shutdown] INFO <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')>
21:23:55,752 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,752 [10219] [MainThread:shutdown] INFO <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(1)() at /usr/lib/python3.4/asyncio/tasks.py:582]>
21:23:55,753 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,753 [10219] [MainThread:shutdown] INFO <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]>
21:23:55,753 [10219] [MainThread:shutdown] INFO cancelling task
21:23:55,754 [10219] [MainThread:supervisor] INFO Received CancelledError exception
21:23:55,754 [10219] [MainThread:supervisor] INFO waiting for threads to finish any pending IO tasks
21:23:55,754 [10219] [MainThread:supervisor] INFO closing our asyncio loop
21:23:55,755 [10219] [MainThread:supervisor] INFO exiting with status 0
的主要区别是,当关闭()发送的取消没有唤醒并且作为结果while循环不停止的任务由尝试处理取消的catch块。我现在该如何解决这个问题?
这里是代码
def shutdown():
"""Performs a clean shutdown"""
log.info('received stop signal')
for task in asyncio.Task.all_tasks():
log.info(task)
log.info('cancelling task')
task.cancel()
def write_file(filename, data):
"""Writes data to a file.
Returns:
True if succeeds False otherwise.
"""
try:
with open(filename, 'w') as file_handle:
file_handle.write(data.decode())
except OSError as exc:
log.critical('failed to write data %s', exc)
return False
else:
log.debug('data saved in %s', filename)
return True
@asyncio.coroutine
def get(socket_file, cmd, storage_dir, loop, executor, timeout):
"""Fetches data from a UNIX socket.
Sends a command to HAProxy over UNIX socket, reads the response and then
offloads the writing of the received data to a thread, so we don't block
this coroutine.
Arguments:
socket_file (str): The full path of the UNIX socket file to connect to.
cmd (str): The command to send.
storage_dir (str): The full path of the directory to save the response.
loop (obj): A base event loop from asyncio module.
executor (obj): A Threader executor to execute calls asynchronously.
timeout (int): Timeout for the connection to the socket.
Returns:
True if statistics from a UNIX sockets are save False otherwise.
"""
# try to connect to the UNIX socket
connect = asyncio.open_unix_connection(socket_file)
log.debug('connecting to UNIX socket %s', socket_file)
try:
reader, writer = yield from asyncio.wait_for(connect, timeout)
except (ConnectionRefusedError, PermissionError, OSError) as exc:
log.critical(exc)
return False
else:
log.debug('connection established to UNIX socket %s', socket_file)
log.debug('sending command "%s" to UNIX socket %s', cmd, socket_file)
writer.write('{c}\n'.format(c=cmd).encode())
data = yield from reader.read()
writer.close()
if len(data) == 0:
log.critical('received zero data')
return False
log.debug('received data from UNIX socket %s', socket_file)
suffix = CMD_SUFFIX_MAP.get(cmd.split()[1])
filename = os.path.basename(socket_file) + suffix
filename = os.path.join(storage_dir, filename)
log.debug('going to save data to %s', filename)
# Offload the writing to a thread so we don't block ourselves.
result = yield from loop.run_in_executor(executor,
write_file,
filename,
data)
return result
@asyncio.coroutine
def pull_stats(config, storage_dir, loop, executor):
"""Launches coroutines for pulling statistics from UNIX sockets.
This a delegating routine.
Arguments:
config (obj): A configParser object which holds configuration.
storage_dir (str): The absolute directory path to save the statistics.
loop (obj): A base event loop.
executor(obj): A ThreadPoolExecutor object.
Returns:
True if statistics from *all* UNIX sockets are fetched False otherwise.
"""
# absolute directory path which contains UNIX socket files.
socket_dir = config.get('pull', 'socket-dir')
timeout = config.getint('pull', 'timeout')
socket_files = [f for f in glob.glob(socket_dir + '/*')
if is_unix_socket(f)]
log.debug('pull statistics')
coroutines = [get(socket_file, cmd, storage_dir, loop, executor, timeout)
for socket_file in socket_files
for cmd in CMDS]
# Launch all connections.
status = yield from asyncio.gather(*coroutines)
return len(set(status)) == 1 and True in set(status)
def supervisor(loop, config):
"""Coordinates the pulling of HAProxy statistics from UNIX sockets.
This is the client routine which launches requests to all HAProxy
UNIX sockets for retrieving statistics and save them to file-system.
It runs indefinitely until main program is terminated.
Arguments:
loop (obj): A base event loop from asyncio module.
config (obj): A configParser object which holds configuration.
"""
dst_dir = config.get('pull', 'dst-dir')
tmp_dst_dir = config.get('pull', 'tmp-dst-dir')
executor = ThreadPoolExecutor(max_workers=config.getint('pull', 'workers'))
exit_code = 1
while True:
log.info('starting while loop')
start_time = int(time.time())
# HAProxy statistics are stored in a directory and we use retrieval
# time(seconds since the Epoch) as a name of the directory.
# We first store them in a temporary place until we receive statistics
# from all UNIX sockets.
storage_dir = os.path.join(tmp_dst_dir, str(start_time))
# If our storage directory can't be created we can't do much, thus
# abort main program.
try:
os.makedirs(storage_dir)
except OSError as exc:
msg = "failed to make directory {d}:{e}".format(d=storage_dir,
e=exc)
log.critical(msg)
log.critical('a fatal error has occurred, exiting..')
break
try:
log.info('launch the delegating coroutine')
result = loop.run_until_complete(pull_stats(config, storage_dir,
loop, executor))
log.info('delegating coroutine finished')
except asyncio.CancelledError:
log.info('Received CancelledError exception')
exit_code = 0
break
# if and only if we received statistics from all sockets then move
# statistics to the permanent directory.
# NOTE: when temporary and permanent storage directory are on the same
# file-system the move is actual a rename, which is an atomic
# operation.
if result:
log.debug('move %s to %s', storage_dir, dst_dir)
try:
shutil.move(storage_dir, dst_dir)
except OSError as exc:
log.critical("failed to move %s to %s: %s", storage_dir,
dst_dir, exc)
log.critical('a fatal error has occurred, exiting..')
break
else:
log.info('statistics are stored in %s', os.path.join(
dst_dir, os.path.basename(storage_dir)))
else:
log.critical('failed to pull stats')
log.debug('removing temporary directory %s', storage_dir)
shutil.rmtree(storage_dir)
# calculate sleep time which is interval minus elapsed time.
sleep = config.getint('pull', 'pull-interval') - (time.time() -
start_time)
if 0 < sleep < config.getint('pull', 'pull-interval'):
log.debug('sleeping for %.3fs secs', sleep)
time.sleep(sleep)
# It is very unlikely that threads haven't finished their job by now, but
# they perform disk IO operations which can take some time in certain
# situations, thus we want to wait for them in order to perform a clean
# shutdown.
log.info('waiting for threads to finish any pending IO tasks')
executor.shutdown(wait=True)
log.info('closing our asyncio loop')
loop.close()
log.info('exiting with status %s', exit_code)
sys.exit(exit_code)
def main():
"""Parses CLI arguments and launches main program."""
args = docopt(__doc__, version=VERSION)
config = ConfigParser(interpolation=ExtendedInterpolation())
# Set defaults for all sections
config.read_dict(copy.copy(DEFAULT_OPTIONS))
# Load configuration from a file. NOTE: ConfigParser doesn't warn if user
# sets a filename which doesn't exist, in this case defaults will be used.
config.read(args['--file'])
if args['--print']:
for section in sorted(DEFAULT_OPTIONS):
print("[{}]".format(section))
for key, value in sorted(DEFAULT_OPTIONS[section].items()):
print("{k} = {v}".format(k=key, v=value))
print()
sys.exit(0)
if args['--print-conf']:
for section in sorted(config):
print("[{}]".format(section))
for key, value in sorted(config[section].items()):
print("{k} = {v}".format(k=key, v=value))
print()
sys.exit(0)
log.setLevel(getattr(logging, config.get('pull', 'loglevel').upper(),
None))
# Setup our event loop
loop = asyncio.get_event_loop()
# Register shutdown to signals
loop.add_signal_handler(signal.SIGHUP, shutdown)
loop.add_signal_handler(signal.SIGTERM, shutdown)
# a temporary directory to store fetched data
tmp_dst_dir = config['pull']['tmp-dst-dir']
# a permanent directory to move data from the temporary directory. Data are
# picked up by the process daemon from that directory.
dst_dir = config['pull']['dst-dir']
for directory in dst_dir, tmp_dst_dir:
try:
os.makedirs(directory)
except OSError as exc:
# errno 17 => file exists
if exc.errno != 17:
sys.exit("failed to make directory {d}:{e}".format(d=directory,
e=exc))
supervisor(loop, config)
# This is the standard boilerplate that calls the main() function.
if __name__ == '__main__':
main()
我修改了代码如你所说,它捕获异常,但我还是看到pull_stats()被唤醒,当我发送TERM信号。在你的代码示例中,我没有看到发生这种情况。我不太了解你关于睡眠的说法。你是否暗示睡眠可以阻止线程停止?此外,如何在所有协程中宣传取消,以便我可以在那里清理步骤?非常感谢@kwarunek为你的答案和你的时间提供了一个代码示例,非常感谢 –
我已经编辑了关于SIGTERM的一部分,也没有在例子中介绍。 – kwarunek
@kwarunke,现在有道理。所以,当发送取消时,任务将在协同程序当前挂起的最后一个收益行恢复。在我的情况下,我处于睡眠线,信号进入,主线程从睡眠中唤醒,虽然True启动所有未来,然后接收取消,但协程pull_stats并醒来,但不会因为它们被取消而继续。我仍然试图想出一个方法来捕捉当程序处于连接/接收/写入阶段时取消取消,因为我想执行一些清理操作。再次感谢您的帮助。 –