2017-09-26 67 views
0

我的问题是,我有一个1000条记录的数据集。我想要3个线程处理这样的数据, 从记录1到300的线程1,从301到600的线程2等等。一个线程可以发出请求并一次获取50条记录,创建一个对象并将其放入队列中。主线程将同时从队列中读取数据。主线消费者和其他线程的生产者

下面是代码,我面对的问题是,recordRead变量告诉线程应该从哪里开始读取记录的起点。 但我怎么能为每个线程设置不同的值,例如对于thread1,它应该是0,recordsToRead应该是300,对于thread2,recordRead应该是300,recordsToRead是300 + 300 = 600,最后一个线程应该是600,结束。 pagesize = 50 pagesize,recordRead和recordToRead是属于主类和主线程的所有变量。

ExecutorService service = Executors.newFixedThreadPool(nThreads); 
    while(nThreads > 0) { 
     nThreads--; 
     service.execute(new Runnable() { 

      @Override 
      public void run() { 
       // TODO Auto-generated method stub 

       do { 
        int respCode = 0; 
        int RecordsToRead = div; 
        JSONObject jsObj = new JSONObject(); 
        jsObj.put("pagesize", pageSize); 
        jsObj.put("start", recordsRead); 
        jsObj.put("searchinternalid", searchInternalId); 

        try { 
         boolean status = req.invoke(jsObj); 
         respCode = req.getResponseCode(); 

        } catch (Exception e) {   
         req.reset(); 
         e.printStackTrace(); 
         return true; 
        } 
        JSONObject jsResp = req.getResponseJson(); 
        //here jsResp will be added to ArrayBlockingQueue. 

        req.reset(); 
       }while(!isError && !isMaxLimit && recordsRead < RecordsToRead); 

      } 

     }); 
    } 

此循环将是主线程读取队列的代码。 如何为所有线程设置recordsRead和recordToread。

以及如何使主线程等待,直到一个线程插入队列中的对象。

+0

您可以创建'Runnable'的子类作为类ctor参数的起始位置(以及其他)。 –

回答

0

我在你看来定义了两个问题。第一个问题是执行并行块计算,第二个问题是从中创建一个连续的管道。让我们从第一个问题开始。要使用预定义大小进行并行计算,最佳选择fmpv将使用fork-join框架。不仅在性能上(偷工作真的很有效),而且由于代码简单。但是因为你对我来说只限于3个线程,所以直接使用线程似乎也是有效的。只要你想要我可以这样实现:

final int chunkSize = 300; 
    //you can also use total amount of job 
    //int totalWork = 1000 and chunk size equals totalWork/threadsNumber 
    final int threadsNumber = 3; 

    Thread[] threads = new Thread[threadsNumber]; 

    for (int ii = 0; ii < threadsNumber; ii++) { 
     final int i = ii; 

     threads[ii] = new Thread(() -> { 
      //count your variable according the volume 
      // for example you can do so 
      int chunkStart = i * chunkSize; 
      int chunkEnd = chunkStart + chunkSize; 
      for(int j = chunkStart; j < chunkEnd; j++) { 
       //object creation with necessary proprs 
       //offer to queue here 
      } 
     }); 

     threads[ii].start(); 
    } 

    //your code here 
    //take here 

    for (int ii = 0; ii < threadsNumber; ii++) { 
     try { 
     //this part is only as example 
     //you do not need it     
     //here if you want you can also w8 for completion of all threads 
      threads[ii].join(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 

现在关于消费的第二个问题。对于这个puprose,你可以使用例如ConcurrentLinkedBlockingQueue(http://www.jgroups.org/javadoc/org/jgroups/util/ConcurrentLinkedBlockingQueue.html)。在生产者线程中提供报价并使用主要的take方法。

但说实话,我仍然没有得到你的问题的原因。你想创建连续的管道还是只是一次计算?

另外我会建议你参加这门课程:https://www.coursera.org/learn/parallel-programming-in-java/home/welcome。 这将帮助您完全解决您的问题并提供各种解决方案。还有并发和分布式计算课程。

相关问题