2010-01-02 24 views
3

场景:我们有一个python脚本,可以同时检查数千个proxys。 程序使用线程(每个代理1个)来加速进程。当它到达1007线程时,由于线程限制,脚本崩溃。
我的解决方案是:一个全局变量,当一个线程完成时线程产生和减少时递增。产生线程的函数监视变量,以便达不到限制。
你的解决方案是什么,朋友?绕过os线程限制的编程策略?

感谢您的答案。

回答

5

您想要使用select module执行非阻塞I/O。

有几种不同的特定技术。 select.select应该适用于每个主要平台。还有其他一些更高效的变体(如果您同时检查数以万计的连接,则可能很重要),但是您需要为您的特定平台编写代码。

+0

这可能非常好,我在“异步套接字编程”中看到了这种技术。可能比具有多个固定线程的解决方案好得多。 – 19021programmer 2010-01-02 22:43:11

3

Python是否具有任何类型的异步IO功能?这将是首选答案IMO - 为每个出站连接产生一个额外的线程并不像有一个事件驱动的单线程那么简单。

+0

那么你已经把我放在了一个很好的轨道上,谢谢。 – 19021programmer 2010-01-02 22:44:06

1

使用select模块或类似的库很可能是更有效的解决方案,但这需要更大的体系结构更改。

如果你只是想限制线程的数量,一个全局计数器应该没问题,只要你以线程安全的方式访问它。

2

使用不同的进程和管道传输数据。在Python中使用线程是非常蹩脚的。据我所知,即使你有一个多核处理器,它们也不会并行运行......但也许它已经在python3中修复了。

+0

实际上,在大多数语言中使用线程会导致锁定到单个处理器或具有如此多的内存障碍以至于多个处理器在锁定步骤中运行。这不是Python特有的。 – 2010-01-02 23:20:51

+1

+1:我会推荐多个线程的多个进程。 – 2010-01-02 23:21:49

4

我以前遇到过这种情况。只需制作一个任务池,并产生固定数量的线程,这些线程运行一个无限循环,从池中抓取任务,运行并重复。本质上,你正在实现自己的线程抽象,并使用操作系统线程来实现它。

这确实有缺点,主要的一点是如果您的任务阻塞很长一段时间,他们可以阻止执行其他任务。但它确实可以让您创建无限数量的任务,仅受内存限制。

+0

这听起来相当不错。我不必担心线程号。但是任务是检查具有可变等待时间的代理。我应该做一些统计来找出线程的最佳常量。 – 19021programmer 2010-01-02 22:36:41

+0

可以肯定我不会阻止其他任务。 – 19021programmer 2010-01-02 22:37:14

+0

我猜你有2个选项。选项1是产生1006个OS线程。如果您确实想知道您需要的最小线程数,请继续添加线程,直到达到100%的CPU利用率。这将意味着你已经超过了所有的I/O等待。 – 2010-01-02 22:56:00

1

小心减少默认线程堆栈大小。至少在Linux上,默认限制会对创建的线程数量造成严重限制。 Linux将进程虚拟地址空间块分配给堆栈(通常为10MB)。 300个线程×10MB堆栈分配=专用于堆栈的3GB虚拟地址空间,而在32位系统上则有3GB限制。你可能会少得多。

2

我的解决方案是:一个全局变量,当一个线程完成时线程产生和减少时会增加。产生线程的函数监视变量,以便达不到限制。

标准的方法是让每个线程在一个循环中获得下一个任务,而不是在处理一个之后死亡。这样你就不必跟踪线程的数量,因为你只需要触发一定数量的线程。作为奖励,你节省了线程创建/销毁。

2

一个计数信号应该做的伎俩。

from socket import * 
from threading import * 

maxthreads = 1000 
threads_sem = Semaphore(maxthreads) 

class MyThread(Thread): 
    def __init__(self, conn, addr): 
     Thread.__init__(self) 
     self.conn = conn 
     self.addr = addr 
    def run(self): 
     try: 
      read = conn.recv(4096) 
      if read == 'go away\n': 
       global running 
       running = False 
      conn.close() 
     finally: 
      threads_sem.release() 

sock = socket() 
sock.bind(('0.0.0.0', 2323)) 
sock.listen(1) 
running = True 
while running: 
    conn, addr = sock.accept() 
    threads_sem.acquire() 
    MyThread(conn, addr).start() 
1

Twisted非常适合这个问题。有关编写客户端的教程,请参见http://twistedmatrix.com/documents/current/core/howto/clients.html

如果您不介意使用替代Python实现,则Stackless具有轻量级(非本地)线程。我认识的唯一一家公司是CCP,他们在客户端和服务器上使用它作为游戏中的任务。您仍然需要使用Stackless进行异步I/O,因为如果线程阻塞,进程将被阻止。

+0

非常感谢。我会尝试扭曲。 – 19021programmer 2010-01-02 23:47:48

1

正如另一个线程中提到的那样,为什么每次操作都会产生一个新线程?这是一个经典的生产者 - 消费者问题,不是吗?取决于你如何看待它,代理检查器可能是商业用户或生产商。

无论如何,解决方案是对“任务”进行“排队”处理,并使循环中的线程检查是否有任何其他任务要在队列中执行,如果没有,请等待预定义的时间间隔,然后再次检查。

您应该使用一些锁定机制(即信号量)来保护您的队列以防止竞争条件。

这其实并不困难。但它需要一点思考才能做到。祝你好运!