2016-09-17 77 views
-1

我们正在运行一个大型Python代码来随机扫描一些物理模型的参数空间(因此,给出一个简单的例子非常困难,对不起)。 评估一个参数点大约需要300ms,但有时(我不知道为什么)评估突然需要几个小时,这会杀死计算集群上的CPU预算。threading.join()如何检测超时?

所以,我的想法是使用线程来给每个参数点的评估运行的最大时间。如果评估需要更长时间,那么我可以忽略这一点,因为它是不实际的。现在,这似乎不起作用。我在一个新的线程中开始计算,将它连接到主线程,超时设置为例如1秒,但主线程仍然等待计算终止(这要花费大于1秒)。

这怎么可能?线程如何衡量新线程已经运行的时间? 我不得不说,在评估一个参数点时,我大量使用nlopt,numpy和scipy。正如我所假设的,其中大部分是不直接写入python,而是用一些二进制文件加速计算。这是否会影响线程(因为函数是它的“黑盒子”)?

谢谢!

+0

您是否阅读过[join()']文档(https://docs.python.org/3.5/library/threading.html#threading.Thread.join)? Quote:*因为join()总是返回None,所以必须在join()之后调用'is_alive()'来决定是否发生超时** - 如果线程还活着,join ()'call timed out。* – Bakuriu

+0

另外:标准接口不提供任何杀死线程的方法。你最好使用多处理*,这样更容易杀死。 – Bakuriu

回答

0

简短的回答:

我不认为threading.join检查超时。你必须检查它是否超时。

在任何情况下得到有效的解决方案,最小的代码片段会有所帮助。这主要是猜测,但如果主进程没有检查超时,那么它将继续保持。

较长的答案:

顺其自然的timeout参数云:

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L1060

self._wait_for_tstate_lock(timeout=max(timeout, 0)) 

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L1062-L1074

def _wait_for_tstate_lock(self, block=True, timeout=-1): 
    # Issue #18808: wait for the thread state to be gone. 
    # At the end of the thread's life, after all knowledge of the thread 
    # is removed from C data structures, C code releases our _tstate_lock. 
    # This method passes its arguments to _tstate_lock.acquire(). 
    # If the lock is acquired, the C code is done, and self._stop() is 
    # called. That sets ._is_stopped to True, and ._tstate_lock to None. 
    lock = self._tstate_lock 
    if lock is None: # already determined that the C code is done 
     assert self._is_stopped 
    elif lock.acquire(block, timeout): 
     lock.release() 
     self._stop() 

如果没有锁确保线程停止。 否则通过给定参数blocktimeout获取锁。

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L117

def acquire(self, blocking=True, timeout=-1): 
    """Acquire a lock, blocking or non-blocking. 
    When invoked without arguments: if this thread already owns the lock, 
    increment the recursion level by one, and return immediately. Otherwise, 
    if another thread owns the lock, block until the lock is unlocked. Once 
    the lock is unlocked (not owned by any thread), then grab ownership, set 
    the recursion level to one, and return. If more than one thread is 
    blocked waiting until the lock is unlocked, only one at a time will be 
    able to grab ownership of the lock. There is no return value in this 
    case. 
    When invoked with the blocking argument set to true, do the same thing 
    as when called without arguments, and return true. 
    When invoked with the blocking argument set to false, do not block. If a 
    call without an argument would block, return false immediately; 
    otherwise, do the same thing as when called without arguments, and 
    return true. 
    When invoked with the floating-point timeout argument set to a positive 
    value, block for at most the number of seconds specified by timeout 
    and as long as the lock cannot be acquired. Return true if the lock has 
    been acquired, false if the timeout has elapsed. 
    """ 
    me = get_ident() 
    if self._owner == me: 
     self._count += 1 
     return 1 
    rc = self._block.acquire(blocking, timeout) 
    if rc: 
     self._owner = me 
     self._count = 1 
    return rc 

获取锁获取线程身份。 递增计数。

真的拿到锁。

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L98

self._block = _allocate_lock() 

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L33

_allocate_lock = _thread.allocate_lock 

https://github.com/python/cpython/blob/464aaba29700905badb7137e5048f8965833f946/Lib/threading.py#L4

import _thread 

https://github.com/python/cpython/blob/7b90e3674be86479c51faf872d0b9367c9fc2f96/Modules/_threadmodule.c#L1300-L1301

static PyMethodDef thread_methods[] = { 
    {"start_new_thread",  (PyCFunction)thread_PyThread_start_new_thread, 
    METH_VARARGS, start_new_doc}, 
    {"start_new",    (PyCFunction)thread_PyThread_start_new_thread, 
    METH_VARARGS, start_new_doc}, 
    {"allocate_lock",   (PyCFunction)thread_PyThread_allocate_lock, 
    METH_NOARGS, allocate_doc}, 
    {"allocate",    (PyCFunction)thread_PyThread_allocate_lock, 
    METH_NOARGS, allocate_doc}, 
    {"exit_thread",    (PyCFunction)thread_PyThread_exit_thread, 
    METH_NOARGS, exit_doc}, 
    {"exit",     (PyCFunction)thread_PyThread_exit_thread, 
    METH_NOARGS, exit_doc}, 
    {"interrupt_main",   (PyCFunction)thread_PyThread_interrupt_main, 
    METH_NOARGS, interrupt_doc}, 
    {"get_ident",    (PyCFunction)thread_get_ident, 
    METH_NOARGS, get_ident_doc}, 
    {"_count",     (PyCFunction)thread__count, 
    METH_NOARGS, _count_doc}, 
    {"stack_size",    (PyCFunction)thread_stack_size, 
    METH_VARARGS, stack_size_doc}, 
    {"_set_sentinel",   (PyCFunction)thread__set_sentinel, 
    METH_NOARGS, _set_sentinel_doc}, 
    {NULL,      NULL}   /* sentinel */ 
}; 

定义allocated_lock方法与类型PyCFunction和名称thread_PyThread_allocate_lock

https://github.com/python/cpython/blob/7b90e3674be86479c51faf872d0b9367c9fc2f96/Modules/_threadmodule.c#L1128-L1131

static PyObject * 
thread_PyThread_allocate_lock(PyObject *self) 
{ 
    return (PyObject *) newlockobject(); 
} 

https://github.com/python/cpython/blob/7b90e3674be86479c51faf872d0b9367c9fc2f96/Modules/_threadmodule.c#L538-L553

static lockobject * 
newlockobject(void) 
{ 
    lockobject *self; 
    self = PyObject_New(lockobject, &Locktype); 
    if (self == NULL) 
     return NULL; 
    self->lock_lock = PyThread_allocate_lock(); 
    self->locked = 0; 
    self->in_weakreflist = NULL; 
    if (self->lock_lock == NULL) { 
     Py_DECREF(self); 
     PyErr_SetString(ThreadError, "can't allocate lock"); 
     return NULL; 
    } 
    return self; 
} 

分配一个新的上下文和锁定

https://github.com/python/cpython/blob/2d264235f6e066611b412f7c2e1603866e0f7f1b/Python/thread_pthread.h#L276

PyThread_type_lock 
PyThread_allocate_lock(void) 
{ 
    sem_t *lock; 
    int status, error = 0; 

    dprintf(("PyThread_allocate_lock called\n")); 
    if (!initialized) 
     PyThread_init_thread(); 

    lock = (sem_t *)PyMem_RawMalloc(sizeof(sem_t)); 

    if (lock) { 
     status = sem_init(lock,0,1); 
     CHECK_STATUS("sem_init"); 

     if (error) { 
      PyMem_RawFree((void *)lock); 
      lock = NULL; 
     } 
    } 

    dprintf(("PyThread_allocate_lock() -> %p\n", lock)); 
    return (PyThread_type_lock)lock; 
} 

https://github.com/python/cpython/blob/2d264235f6e066611b412f7c2e1603866e0f7f1b/Python/thread.c#L60-L77

void 
PyThread_init_thread(void) 
{ 
#ifdef Py_DEBUG 
    char *p = Py_GETENV("PYTHONTHREADDEBUG"); 

    if (p) { 
     if (*p) 
      thread_debug = atoi(p); 
     else 
      thread_debug = 1; 
    } 
#endif /* Py_DEBUG */ 
    if (initialized) 
     return; 
    initialized = 1; 
    dprintf(("PyThread_init_thread called\n")); 
    PyThread__init_thread(); 
} 

https://github.com/python/cpython/blob/2d264235f6e066611b412f7c2e1603866e0f7f1b/Python/thread_pthread.h#L170-L176

static void 
PyThread__init_thread(void) 
{ 
#if defined(_AIX) && defined(__GNUC__) 
    extern void pthread_init(void); 
    pthread_init(); 
#endif 
} 

https://github.com/python/cpython/blob/f243de2bc8d940316ce8da778ec02a7bbe594de1/configure.ac#L3416

AC_CHECK_FUNCS(alarm accept4 setitimer getitimer bind_textdomain_codeset chown \ 
clock confstr ctermid dup3 execv faccessat fchmod fchmodat fchown fchownat \ 
fexecve fdopendir fork fpathconf fstatat ftime ftruncate futimesat \ 
futimens futimes gai_strerror getentropy \ 
getgrouplist getgroups getlogin getloadavg getpeername getpgid getpid \ 
getpriority getresuid getresgid getpwent getspnam getspent getsid getwd \ 
if_nameindex \ 
initgroups kill killpg lchmod lchown lockf linkat lstat lutimes mmap \ 
memrchr mbrtowc mkdirat mkfifo \ 
mkfifoat mknod mknodat mktime mremap nice openat pathconf pause pipe2 plock poll \ 
posix_fallocate posix_fadvise pread \ 
pthread_init pthread_kill putenv pwrite readlink readlinkat readv realpath renameat \ 
select sem_open sem_timedwait sem_getvalue sem_unlink sendfile setegid seteuid \ 
setgid sethostname \ 
setlocale setregid setreuid setresuid setresgid setsid setpgid setpgrp setpriority setuid setvbuf \ 
sched_get_priority_max sched_setaffinity sched_setscheduler sched_setparam \ 
sched_rr_get_interval \ 
sigaction sigaltstack siginterrupt sigpending sigrelse \ 
sigtimedwait sigwait sigwaitinfo snprintf strftime strlcpy symlinkat sync \ 
sysconf tcgetpgrp tcsetpgrp tempnam timegm times tmpfile tmpnam tmpnam_r \ 
truncate uname unlinkat unsetenv utimensat utimes waitid waitpid wait3 wait4 \ 
wcscoll wcsftime wcsxfrm wmemcmp writev _getpty) 

http://man7.org/linux/man-pages/man7/pthreads.7.html

所有这些要求两件事:超时a float?并且你如果isAlive检查:

当超时参数存在,而不是无,应该指定,以秒为操作超时浮点数(或其部分)。由于join()总是返回None,所以必须在join()之后调用is_alive()来决定是否发生超时 - 如果线程仍处于活动状态,则join()调用超时。