2012-04-14 52 views
3

我认为我做错了。我正在创建线程,以便从共享队列中剔除一些数据。我的问题是程序速度慢,内存不足,我怀疑队列可能不像我希望的那样共享。我怀疑这是因为在我的代码中,我添加了一行显示队列的大小,如果我启动2个线程,然后我得到两个完全不同数字的输出,似乎自己增加(我认为它可能是相同的数字,但也许它从100跳到2,等等,但看完它后显示105和5,并以不同的速率运行,如果我有4个线程,那么我看到4个不同的数字)。如何实现一个可以被多个线程处理的队列?

这是相关部分的片段。我在节目

static class queue_class { 
     int number; 
     int[] data; 
     Context(int number, int[] data) { 
      this.number = number; 
      this.data = data; 
     } 
    } 

然后我将一些职位的调用后创建队列的顶部创建队列我想要的数据静态类..

static class process_threaded implements Callable<Void> { 
    // queue with contexts to process 
    private Queue<queue_class> queue; 

    process_threaded(queue_class request) { 
     queue = new ArrayDeque<queue_class>(); 
     queue.add(request); 
    } 

    public Void call() { 
     while(!queue.isEmpty()) { 
      System.out.println("in contexts queue with a size of " + queue.size()); 
      Context current = contexts.poll(); 
      //get work and process it, if it work great then the solution goes elsewhere 
      //otherwise, depending on the data, its either discarded or parts of it is added back to queue 
      queue.add(new queue_class(k, data_list)); 

,你可以看到,数据有3个选项,如果数据是好的,就会被发送出去,如果数据完全可怕或丢回队列就丢弃。我认为队列正在发送,但我怀疑是因为每个线程都在自己的队列中工作,而不是共享队列。

这是猜测正确的,我在做这错了吗?

+0

此代码不会编译。你是否尝试编译和执行某些东西,然后问一个具体的问题,而不是编写一个不被java编译器接受的伪代码? – 2012-04-14 04:24:19

+0

我的实际代码编译..我没有想到任何人都想编译它,所以我拿了代码,清理了与问题无关的信息,只是为了展示我在做什么。我不需要代码只是逻辑,并认为上面的代码应该让我知道我在做什么。 – 2012-04-14 04:33:28

+0

您应该阅读[实践中的Java并发](http://www.amazon.com/Java-Concurrency-Practice-Brian-Goetz/dp/0321349601)以编写线程安全代码的专业知识。 – 2012-04-14 04:54:35

回答

2

你在你的评估,即每个线程(可能)拥有自己的队列工作,因为你正在创建你的Callable的构造队列正确。 (它实际上很奇怪有一个Callable<Void> - 是不是只是一个Runnable?)

还有其他问题,例如,您正在处理一个非线程安全的队列,或者你的代码在编写时不会编译的事实。

但是,重要的问题是你真的需要显式创建一个队列吗?为什么没有ExecutorService你提交Callable(或Runnables,如果你决定进行切换):将执行者的引用传递到你的Callable,他们可以将新的Callable添加到执行者的任务队列中以运行。没有必要重新发明轮子。

例如:

static class process_threaded implements Runnable { 
    // Reference to an executor 
    private final ExecutorService exec; 
    // Reference to the job counter 
    private final AtomicInteger jobCounter; 
    // Request to process 
    private queue_class request; 

    process_threaded(ExecutorService exec, AtomicInteger counter, queue_class request) { 
     this.exec = exec; 
     this.jobCounter = counter; 
     this.jobCounter.incrementAndGet(); // Assuming that you will always 
              // submit the process_threaded to 
              // the executor if you create it. 
     this.request = request; 
    } 

    public run() { 
     //get work and process **request**, if it work great then the solution goes elsewhere 
     //otherwise, depending on the data, its either discarded or parts of are added back to the executor 
     exec.submit(new process_threaded(exec, new queue_class(k, data_list))); 

     // Can do some more work 

     // Always run before returning: counter update and notify the launcher 
     synchronized(jobCounter){ 
      jobCounter.decrementAndGet(); 
      jobCounter.notifyAll(); 
     } 
    } 
} 

编辑:

为了解决您的时候关闭执行的问题,我觉得最简单的办法是有一个工作计数器,关机时达到0.对于螺纹安全,AtomicInteger可能是最佳选择。我在上面添加了一些代码来包含更改。然后你的启动代码看起来像这样:

void theLauncher() { 

    AtomicInteger jobCounter = new AtomicInteger(0); 

    ExecutorService exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcesses()); 

    exec.submit(new process_threaded(exec, jobCounter, someProcessRequest)); 
    // Can submit some other things here of course... 

    // Wait for jobs to complete: 
    for(;;jobCounter.get() > 0){ 
     synchronized(jobCounter){ // (I'm not sure if you have to have the synchronized block, but I think this is safer. 
      if(jobCounter.get() > 0) 
       jobCounter.wait(); 
     } 
    } 

    // Now you can shutdown: 
    exec.shutdown(); 
} 
+0

有趣,没有想到(只是将它提交给ExecutorService)。一些基本的问题。如果我从线程本身做到这一点,或者这不重要,会是一个问题吗? ExecutorService也是安全的多线程进入它(当然,我知道它可以分配任务,但它可以同时采取多个?)?该代码是给我,对不起,所以我想提高我得到了什么,没有实现明显简单的解决方案.. – 2012-04-14 04:38:14

+0

@learningJava是的,它是线程安全的,从不同的线程任务提交到同一个执行者。 (实际上,Java 7中的新ForkJoinTask东西是如何工作的)。顺便说一句,你可能想看看'ForkJoinTask',它可能很适合这个任务。 – trutheality 2012-04-14 04:45:32

+0

我现在正在使用java6..jave7似乎解决了我所有的问题(编程问题,我的意思是)。我会测试它并报告.. – 2012-04-14 04:52:21

2

不要重新发明轮子!如何使用ConcurrentLinkedQueue?从javadocs:

基于链接节点的无界线程安全队列。该队列命令元素FIFO(先进先出)。队列头是最长时间在队列中的元素。队列的尾部是已经在队列上的最短时间的元素。新元素插入到队列尾部,队列检索操作获取队列头部的元素。当许多线程将共享对共同集合的访问时,ConcurrentLinkedQueue是一个合适的选择。

+0

我给它一个shot..thanks我虽然它仅是为java7,但猜测not..thanks。 – 2012-04-14 04:50:38

+0

确保你创建一个单曲在他们共同分享的任务之外领取。如果每个任务创建自己的队列 - 线程安全或不 - 你会遇到问题。 – 2012-04-14 04:53:33

+0

'ConcurrentLinkedQueue'自Java 1.5起可用 – 2012-04-14 04:54:59

相关问题