2012-07-17 22 views
0

我在我的Java应用程序中使用ZMQ。我发现它表现不均匀,即如果我发送100条消息,一个消费者说需要1秒,那么如果我们继续增加消费者,所花费的时间就会变成2,1.5,3。没有逐渐增加或减少。我该如何纠正这一点。寻找我的代码如下ZMQ中流量不均

// Broker 

import org.zeromq.ZMQ; import org.zeromq.ZMQ.Context; import org.zeromq.ZMQ.Socket; import org.zeromq.ZMQStreamer;

public class Broker {

/** 
* @param args 
*/ 
public static void main(String[] args) 
{ 
    Context context = ZMQ.context(1); 

    Socket frontEnd = context.socket(ZMQ.PULL); 
    frontEnd.bind("tcp://*:5555"); 

    Socket backEnd = context.socket(ZMQ.PUSH); 
    backEnd.bind("tcp://*:5560"); 

    ZMQStreamer zmqStreamer = new ZMQStreamer(context, frontEnd, backEnd); 
    zmqStreamer.run(); 
} 

}

// Producer

import org.zeromq.ZMQ; import org.zeromq.ZMQ.Socket;

public class Producer {

public void init() 
{ 
    ZMQ.Context context = ZMQ.context(1); 
    socket = context.socket(ZMQ.PUSH); 
    socket.connect("tcp://localhost:5555"); 
} 

public void initMessage(String message) 
{ 
    this.message = message; 
} 

public void sendMessage() 
{ 
    String sendMessage = System.nanoTime() +"#"+ message; 
    socket.send(sendMessage.getBytes(), 0); 
} 
/** 
* @param args 
*/ 
public static void main(String[] args) 
{ 
    Producer producer = new Producer(); 
    producer.init(); 
    byte[] message = new byte[Integer.parseInt(args[0])]; 
    //message = "Hello".getBytes(); 
    producer.initMessage(new String(message)); 
    for(int i=0;i<100;i++) 
    { 
     producer.sendMessage(); 
    } 
} 

private Socket socket = null; 
private String message; 
} 

//Consumer 

import org.zeromq.ZMQ; 
import org.zeromq.ZMQ.Socket; 

public class Consumer 
{ 

public void init() 
{ 
    ZMQ.Context context = ZMQ.context(1); 
    socket = context.socket(ZMQ.PULL); 
    socket.connect("tcp://localhost:5560"); 
} 

public void reciveMessage() 
{ 
    byte[] recived = socket.recv(0); 
    //System.out.println(recived.length); 
    long recivedTime = System.nanoTime(); 
    String message = new String(recived); 
    String[] splitMessage = message.split("#"); 
    long sendTime = Long.parseLong(splitMessage[0]); 
    System.out.println("Send Time " + sendTime + " RecivedTime " 
      + recivedTime + " Time taken " + (recivedTime - sendTime) 
      + " Message " + message); 
} 
/** 
* @param args 
*/ 
public static void main(String[] args) 
{ 
    Consumer consumer = new Consumer(); 
    consumer.init(); 
    for (int i=0;i<100;i++) 
    { 
     consumer.reciveMessage(); 
    } 
} 
private Socket socket = null; 
} 
+0

给我看一些代码 – Schildmeijer 2012-07-17 17:51:10

+0

Schildmeijer:我已经附上了代码。请通过它。 – Muzy 2012-07-18 05:14:38

+0

我知道这不是建设性的,但你的问题的标题让我想起珍珠果酱! – 2012-07-18 05:16:13

回答

1

为了可靠地时间一块多线程代码,你将需要有在收集器/接收器(同步的开始和结束时间,你现在做的一些方法没有显示被编程)。

从ZMQ指导,其中规定了以下过程的划分数据集的正确ZMQ方式之一退房this example

...

我们的超级计算应用是一个相当典型的并行处理模型:

We have a ventilator that produces tasks that can be done in parallel. 
We have a set of workers that process tasks. 
We have a sink that collects results back from the worker processes. 

...