2013-07-29 103 views
2

我有责任链,它由3个阶段。与线程池链责任

Stages

我对阶段实现这个样子。

public class InitialStage implements RecordHandler { 

    private RecordHandler next; 

    @Override 
    public void setNext(RecordHandler handler) { 
     if (this.next == null) { 
      this.next = handler; 
     } else { 
      this.next.setNext(handler); 
     } 
    } 

    @Override 
    public void handleRequest(String record) { 
     System.out.println("Processing @ Initial Stage."); 
     if (next != null) { 
      next.handleRequest(record); 
     } 
    } 
} 

流程经理

public class ProcessManager { 
    private RecordProcessor processor = new RecordProcessor(); 

    public ProcessManager() 
    { 
     createPipeLine(); 
    } 

    private void createPipeLine() { 
     processor = new RecordProcessor(); 
     processor.addHandler(new InitialStage()); 
     processor.addHandler(new MiddleStage()); 
     processor.addHandler(new FinalStage()); 
    } 

    public void processRecord(String record){ 
     processor.handleRequest(record); 
    } 
} 

接口

public interface RecordHandler { 
    public void setNext(RecordHandler handler); 
    public void handleRequest(String record); 
} 

最后的RecordProcessor,

public class RecordProcessor { 

    private RecordHandler successor; 
    private RecordHandler first; 

    public RecordProcessor(){ 
    } 

    public void addHandler(RecordHandler recordHandler){ 
     if(this.first == null){ 
      this.first = recordHandler; 
     } 
     else{ 
      this.successor.setNext(recordHandler); 
     } 
     this.successor = recordHandler; 
    } 

    public void handleRequest(String record){ 
     first.handleRequest(record); 
    } 
} 

现在我观察到我的中期阶段需要一些时间来完成。所以现在我对使用一个线程池感兴趣,因此记录将以下面的方式处理。

在这里,我认为3个工作线程。

Suggested Chain

问题

如何正确地修改我现有的COR,以满足新的要求?

+0

通过在类似于http://ideone.com/ql0848的阶段之间传递'Future's? – zapl

回答

1

运行这个时候回答我的问题,为社会的利益是必须的,因为我找到了一个可行的解决方案的任何线程同步。

我开发了这个问题稍有不同的解决方案。我没有使用并行管道,而是使用多线程的舞台。 (在这种情况下的中间阶段)。换句话说,不是传递单个记录,而是中间阶段使用输入队列和输出队列。当输入队列填满定义数量的记录时,线程产生。

后续阶段可能会被闲置,直到队列已满,但仍这个实现是可以接受的。当不需要测序时可以使用这个功能。如果您需要按照顺序,那么你将需要保持序列ID,并在输出队列进行排序。

中期

public class MiddleStage implements RecordHandler { 

private RecordHandler next; 
private ExecutorService executor = Executors.newFixedThreadPool(5); 
private Queue<String> inbound = new LinkedList<String>(); 
Collection<Callable<String>> tasks = new ArrayList<Callable<String>>(); 

@Override 
public void setNext(RecordHandler handler) { 
    if (this.next == null) { 
     this.next = handler; 
    } else { 
     this.next.setNext(handler); 
    } 
} 

@Override 
public void handleRequest(String record) { 
    System.out.println("Adding new record to Queue : " + record); 
    inbound.add(record); 
    System.out.println("Queue Size : " + inbound.size()); 

    if (inbound.size() >= 10) 
    { 
     System.out.println("Processing the batch."); 

     for (int i = 0; i < 10; i++){ 
      tasks.add(new MiddleWorker(inbound.poll())); 
     } 

     System.out.println("Processing @ Middle Stage. " + record); 
     List <Future<String>> results = null; 
     try { 
      results = executor.invokeAll(tasks, 60, TimeUnit.SECONDS); 
      tasks.clear(); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

     for (Future<String> f : results) { 
       try { 
       String answer = f.get(); 
       System.out.println("Outbound : " + answer); 
       if (next != null) { 
        next.handleRequest(answer); 
       } 

      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } catch (ExecutionException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      } 
     results.clear(); 
    } 
} 
} 

的MiddleWorker类

public class MiddleWorker implements Callable<String> { 

private String output; 

public MiddleWorker(String record) 
{ 
    output = record + " : OK"; 
} 

@Override 
public String call() throws Exception { 
    try 
    { 
     Thread.sleep(2 * 1000); 
    } 
    catch(final InterruptedException ex) 
    { 
     ex.printStackTrace(); 
    } 

    return (output); 
} 
} 

请做出评论,如果这个答案是不清楚,我将修改的答案。

0

您可以创建一个额外的“舞台”有路过的工作负载为一个线程池的责任。

public class PoolExecutingStage implements RecordHandler { 

    private Executor threadPool; 
    private RecordHandler next; 

    public PoolExecutingStage(Executor tp) { 
     this.threadPool = tp;  
    } 

    @Override 
    public void setNext(RecordHandler handler) { 
      // similar to your example 
    } 

    @Override 
    public void handleRequest(String record) { 
     if (next != null) { 
      threadPool.execute(new Runnable() { 
       public void run() { 
        next.handleRequest(record); 
       } 
      });    
     } 
    } 
} 

接下来,您将需要通过添加调用processor.addHandler(new PoolExecutingStage(configuredThreadPool));初始和中间阶段之间,包括在您的管道新阶段。


也记以处理可能在多线程