2013-10-07 61 views
0

我有两个线程的应用程序,一个写入队列,另一个读取异步。 我需要创建第三个生成20多个。 新创建的线程将运行直到明确停止。那20个线程应该获得“实时”数据以分析它。 每个20都有一个唯一的ID /名称。我需要将相关数据(即READ线程收集)发送到正确的线程(20个线程)。例如如果数据包含一个id为2的字符串 - >我需要将它发送到ID = 2的线程。 我的问题是:我应该如何为20个线程中的每一个持有一个“指针”并将相关数据发送给它? (我可以搜索可运行列表中的id(这将保持线程) - >但是然后我需要调用一个方法“NewData(string)”来将数据发送到正在运行的线程)。 我该怎么做? TIA PazJava多线程消息传递

+0

@OldCurmudgeon有一个很好的答案。有一件事你可能要小心的是“发送数据”到一个线程的概念。快速阅读Java Concurrency教程,它可以帮助您更好地理解这些概念:http://docs.oracle.com/javase/tutorial/essential/concurrency/ – Meesh

+0

谢谢,这个我已经阅读 - >不能找到答案我的需求,欢呼 – user2319608

回答

3

您可能会更好地使用队列与您的线程进行通信。然后,您可以将所有队列放入地图中以便于访问。我会推荐一个BlockingQueue

public class Test { 
    // Special stop message to tell the worker to stop. 
    public static final Message Stop = new Message("Stop!"); 

    static class Message { 
    final String msg; 

    // A message to a worker. 
    public Message(String msg) { 
     this.msg = msg; 
    } 

    public String toString() { 
     return msg; 
    } 

    } 

    class Worker implements Runnable { 
    private volatile boolean stop = false; 
    private final BlockingQueue<Message> workQueue; 

    public Worker(BlockingQueue<Message> workQueue) { 
     this.workQueue = workQueue; 
    } 

    @Override 
    public void run() { 
     while (!stop) { 
     try { 
      Message msg = workQueue.poll(10, TimeUnit.SECONDS); 
      // Handle the message ... 

      System.out.println("Worker " + Thread.currentThread().getName() + " got message " + msg); 
      // Is it my special stop message. 
      if (msg == Stop) { 
      stop = true; 
      } 
     } catch (InterruptedException ex) { 
      // Just stop on interrupt. 
      stop = true; 
     } 
     } 
    } 
    } 

    Map<Integer, BlockingQueue<Message>> queues = new HashMap<>(); 

    public void test() throws InterruptedException { 
    // Keep track of my threads. 
    List<Thread> threads = new ArrayList<>(); 
    for (int i = 0; i < 20; i++) { 
     // Make the queue for it. 
     BlockingQueue<Message> queue = new ArrayBlockingQueue(10); 
     // Build its thread, handing it the queue to use. 
     Thread thread = new Thread(new Worker(queue), "Worker-" + i); 
     threads.add(thread); 
     // Store the queue in the map. 
     queues.put(i, queue); 
     // Start the process. 
     thread.start(); 
    } 

    // Test one. 
    queues.get(5).put(new Message("Hello")); 

    // Close down. 
    for (BlockingQueue<Message> q : queues.values()) { 
     // Stop each queue. 
     q.put(Stop); 
    } 

    // Join all threads to wait for them to finish. 
    for (Thread t : threads) { 
     t.join(); 
    } 
    } 

    public static void main(String args[]) { 
    try { 
     new Test().test(); 
    } catch (Throwable t) { 
     t.printStackTrace(System.err); 
    } 
    } 

} 
+0

很多谢谢,我已经使用BlockQueue的2个线程,“写入”和“读取”到这个队列的初始数据。但是在你的实现中 - >每个工人(我将有20个)将拥有自己的队列 - >所以我将有20个队列,并且我需要40个 - >我的主要类中将有40个队列 - - >这是否有效? – user2319608

+0

另外:为什么我需要所有线程等待? (t.join) - >每个线程都是一个独立的线程,它必须处理它自己的消息(有时可能是空的,有时候可能是空的) – user2319608

+0

@ user2319608 - 我使用连接允许所有线程在我的测试结束。你可能不需要那样做。我不明白你需要40个线程。队列只占用很小的空间,只要你限制它们的大小。我已经限制每个队列10项,如果你愿意,你可以少用。 – OldCurmudgeon