2014-01-16 21 views
0

我试图运行一个http POST的进程,这个进程会发送一个警报(发送警报的时间以纳秒为单位)到服务器。我试图在毫秒内测试服务器处理警报的能力。按照给定的标准,服务器据说可以处理6000次警报/秒。Python:如何在同一时刻触发多个进程

我创建一块使用多处理模块,其发送警报6000的代码,但我使用一个for循环,并因此所用的时间来执行对循环超过超过一秒。因此,所有的6000进程都不会在SAME INSTANT中触发。

有没有办法在同一时刻触发多个(N号)进程?

这是我的代码:flowtesting.py这是一个库。这也是之后进行后 '####'

进口JSON 进口httplib2的

类flowTesting()我的脚本: 高清初始化(个体经营,companyId,deviceIp): self.companyId = companyId self.deviceIp = deviceIp

def generate_savedSearchName(self, randNum): 
    self.randMsgId = randNum 
    self.savedSearchName = "TEST %s risk31 more than 3" % self.randMsgId 

def def_request_body_dict(self): 
    self.reqBody_dict = \ 
     { "Header" : {"agid" : "Agent1", 
         "mid": self.randMsgId, 
         "ts" : 1253125001 
     }, 
      "mp": 
       { 
        "host" : self.deviceIp, 
        "index" : self.companyId, 
        "savedSearchName" : self.savedSearchName, 
       } 
     } 
    self.req_body = json.dumps(self.reqBody_dict) 

def get_default_hdrs(self): 
    self.hdrs = {'Content-type': 'application/json', 
       'Accept-Language': 'en-US,en;q=0.8'} 

def send_request(self, sIp, method="POST"): 
    self.sIp = sIp 
    self.url = "http://%s:8080/agent/splunk/messages" % self.sIp 

    http_cli = httplib2.Http(timeout=180, disable_ssl_certificate_validation=True) 
    rsp, rsp_body = http_cli.request(uri=self.url, method=method, headers=self.hdrs, body=self.req_body) 
    print "rsp: %s and rsp_body: %s" % (rsp, rsp_body) 

# My testScript 
from flowTesting import flowTesting 
import random 
import multiprocessing 

deviceIp = "10.31.421.35" 
companyId = "CPY0000909" 
noMsgToBeSent = 1000 
sIp = "10.31.44.235" 
uniq_msg_id_list = random.sample(xrange(1,10000), noMsgToBeSent) 

def runner(companyId, deviceIp, uniq_msg_id): 
    proc = flowTesting(companyId, deviceIp) 
    proc.generate_savedSearchName(uniq_msg_id) 
    proc.def_request_body_dict() 
    proc.get_default_hdrs() 
    proc.send_request(sIp) 

process_list = [] 
for uniq_msg_id in uniq_msg_id_list: 
    savedSearchName = "TEST-1000 %s risk31 more than 3" % uniq_msg_id 

    process = multiprocessing.Process(target=runner, args=(companyId,deviceIp,uniq_msg_id,)) 
    process.start() 
    process.join() 
    process_list.append(process) 

print "Process list: %s" % process_list 
print "Unique Message Id: %s" % uniq_msg_id_list 
+1

简短回答:否。除非你的服务器有6000个内核来同时运行6000任务。如果你打算以快速的顺序执行它们(但不一定在同一时间),有办法。首先,你可以在这里发布相关部分的代码吗? –

+3

即使您的计算机_does_有6000个内核,并且您的操作系统可以完美安排它们,并且您使用了例如[Barrier](http://docs.python.org/3.3/library/threading.html#barrier - 对象),让它们全部触发......它们仍然会被你的网卡,你的路由器,路上的所有路由器和服务器的网卡排队。 – abarnert

+0

对于像这样的IO操作,根本不需要多个进程。您只需要在一个异步事件循环中从单个程序中打开多个套接字。尽管编码有点复杂。 – Keith

回答

1

把他们都在同一时刻发生显然是不可能的,除非你有一个6000核的机器和OS内核,其调度程序能够处理所有这些完美的(你不),你不能得到一次运行6000个代码。

而且,即使你做了,他们一直努力在做的是一个套接字上发送消息。即使你的内核是疯狂的并行,除非你有6000个独立的网卡,它们最终会在网卡缓冲区中串行化。这就是IP的工作方式:一个接一个的包。当然还有路径上的所有路由器,服务器的NIC,服务器的操作系统等。即使IP不阻碍,字节也需要花费时间通过电缆传输。因此,即使在理论上同时做到这一点的唯一方法是每边都有6000个NIC,并使用相同的光纤将它们直接连接到彼此。

不过,你真的不需要他们在同一时刻,只是彼此更加接近比他们。你没有向我们展示你的代码,但大概你只是开始6000 Process es,都立即尝试发送消息。这意味着你需要包括进程启动时间 - 这可能会很慢(特别是在Windows上) - 在歪斜时间。

您可以通过使用线程而不是进程来减少这种情况。这看起来可能违反直觉,但是Python在处理I/O绑定的线程方面非常出色,而且每个现代操作系统都非常擅长启动新线程。

但是实际上,您需要的是您的线程或进程上的Barrier,以让他们在任何人尝试执行任何工作之前完成所有设置工作(包括进程启动)。

它仍然可能不会是不够紧,但是这将是更严格了很多可能比你现在所拥有的。


您将面临的下一个限制是上下文切换时间。现代操作系统在调度方面非常出色,但不是6000个并发任务。所以真的,你想减少到N个进程,每个进程只是尽可能快地按顺序发送6000/N个连接。这会让他们进入内核/网卡的速度比一次尝试6000并使操作系统为您做序列化要快得多。 (事实上​​,在某些平台上,这取决于你的硬件,你实际上可能是有一个过程连续N多干什么6000/N做6000更好。测试两者兼得。)


目前仍然有些套接字库本身的开销。为了解决这个问题,您需要预先制作所有IP数据包,然后创建一个原始套接字并将这些数据包垃圾邮件。发送来自每个连接的第一个数据包,然后从每个连接发送第二个数据包等。

+0

由于GIL,使用Python线程可能不起作用。 – Meh

+0

@Adal:对于I/O限制的代码(至少在3.2+以上),GIL不会导致严重问题,仅限于CPU绑定代码。还有一些开销,但与流程启动开销相比,可能会少很多。这就是我的意思,“这看起来有点违反直觉,但是Python在处理I/O绑定的线程方面非常好,而且每个现代操作系统都非常擅长启动新线程。” – abarnert

0

您需要使用进程间同步原语。在Linux上,您可以使用Sys-V信号量,在Windows上您可以使用Win32事件。

您的6000进程将等待此信号量/事件,并从不同的进程触发它,从而将您的所有6000进程从等待状态释放到就绪状态,然后操作系统将尽快执行它们尽可能。

相关问题