3

我的目标是按顺序发布异步事件,这些异步事件也按顺序到达并且需要任意时间进行处理。所以下面是我目前的实施只使用waitnotifyMyThread处理事件,按id将结果放入哈希表,并在发布此事件前按顺序通知Scheduler线程(如果它已被阻止)。按顺序处理异步事件并发布结果

使用java.util.concurrent包实现此功能的方法会更好,更简洁吗?

import java.util.Map; 
import java.util.Random; 
import java.util.concurrent.ConcurrentHashMap; 


public class AsyncHandler { 
    private final Map<Integer, Object> locks = new ConcurrentHashMap<Integer, Object>(); 
    private final Map<Integer, Result> results = new ConcurrentHashMap<Integer, Result>(); 
    private static final Random rand = new Random(); 

    public AsyncHandler() { 
     new Scheduler(this, locks, results).start(); 
    } 

    public void handleEvent(Event event) { 
     System.out.println("handleEvent(" + event.id + ")"); 
     new MyThread(this, event, locks, results).start(); 
    } 

    public Result processEvent (Event event) { 
     System.out.println("processEvent(" + event.id + ")"); 
     locks.put(event.id, new Object()); 

     try { 
      Thread.sleep(rand.nextInt(10000)); 
     } catch (InterruptedException e) { 
      System.out.println(e); 
     } 

     return new Result(event.id); 
    } 

    public void postProcessEvent (Result result) { 
     System.out.println(result.id); 
    } 

    public static void main (String[] args) { 
     AsyncHandler async = new AsyncHandler(); 

     for (int i = 0; i < 100; i++) { 
      async.handleEvent(new Event(i)); 
     } 
    } 
} 

class Event { 
    int id; 

    public Event (int id) { 
     this.id = id; 
    } 
} 

class Result { 
    int id; 

    public Result (int id) { 
     this.id = id; 
    } 
} 

class MyThread extends Thread { 
    private final Event event; 
    private final Map<Integer, Object> locks; 
    private final Map<Integer, Result> results; 
    private final AsyncHandler async; 

    public MyThread (AsyncHandler async, Event event, Map<Integer, Object> locks, Map<Integer, Result> results) { 
     this.async = async; 
     this.event = event; 
     this.locks = locks; 
     this.results = results; 
    } 

    @Override 
    public void run() { 
     Result res = async.processEvent(event); 
     results.put(event.id, res); 

     Object lock = locks.get(event.id); 

     synchronized (lock) { 
      lock.notifyAll(); 
     } 
    } 
} 

class Scheduler extends Thread { 
    private int curId = 0; 
    private final AsyncHandler async; 
    private final Map<Integer, Object> locks; 
    private final Map<Integer, Result> results; 

    public Scheduler (AsyncHandler async, Map<Integer, Object> locks, Map<Integer, Result> results) { 
     this.async = async; 
     this.locks = locks; 
     this.results = results; 
    } 

    @Override 
    public void run() { 
     while (true) { 
      Result res = results.get(curId); 
      if (res == null) { 
       Object lock = locks.get(curId); 

       //TODO: eliminate busy waiting 
       if (lock == null) { 
        continue; 
       } 

       synchronized (lock) { 
        try { 
         lock.wait(); 
        } catch (InterruptedException e) { 
         System.out.println(e); 
         System.exit(1); 
        } 
       } 
       res = results.get(curId); 
      } 

      async.postProcessEvent(res); 
      results.remove(curId); 
      locks.remove(curId); 
      curId++; 
     } 
    } 
} 
+0

您可以使用[的ConcurrentLinkedQueue(http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html)来处理传入事件的cuncurency和秩序,启动结果[Future](http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/Future.html)并将它们放入队列中的线程 –

回答

2

是的并发库会简单得多。

ExecutorService旨在包装一个线程池和一个队列,为每个任务返回一个Future并提供等待结果的任何线程。

如果要按顺序处理结果,请按顺序处理未来结果。

要处理异步结果,以便你可以做

public class Main { 
    public static void main(String[] args) { 
     Main main = new Main(); 
     for (int i = 0; i < 1000; i++) { 
      final int finalI = i; 
      main.submitTask(new Callable<Long>() { 
       @Override 
       public Long call() throws Exception { 
        long millis = (long) (Math.pow(2000, Math.random())); 
        Thread.sleep(millis); 
        return millis; 
       } 
      }, new ResultHandler<Long>() { 
       @Override 
       public void onFuture(Future<Long> future) throws ExecutionException, InterruptedException { 
        System.out.println(new Date() + ": " + finalI + " - Slept for " + future.get() + " millis"); 
       } 
      }); 
     } 
     main.shutdown(); 
    } 


    public interface ResultHandler<T> { 
     void onFuture(Future<T> future) throws Exception; 
    } 

    private final ExecutorService pool = Executors.newFixedThreadPool(10); 
    private final ExecutorService result = Executors.newSingleThreadExecutor(); 

    public synchronized <T> void submitTask(Callable<T> callable, final ResultHandler<T> resultHandler) { 
     final Future<T> future = pool.submit(callable); 
     result.submit(new Runnable() { 
      @Override 
      public void run() { 
       try { 
        resultHandler.onFuture(future); 
       } catch (Exception e) { 
        e.printStackTrace(); 
       } 
      } 
     }); 
    } 

    public void shutdown() { 
     pool.shutdown(); 
     result.shutdown(); 
    } 
} 

打印

Wed Oct 02 16:32:07 CEST 2013: 0 - Slept for 1 millis 
Wed Oct 02 16:32:07 CEST 2013: 1 - Slept for 1899 millis 
Wed Oct 02 16:32:09 CEST 2013: 2 - Slept for 32 millis 
Wed Oct 02 16:32:09 CEST 2013: 3 - Slept for 32 millis 
Wed Oct 02 16:32:09 CEST 2013: 4 - Slept for 214 millis 
Wed Oct 02 16:32:09 CEST 2013: 5 - Slept for 366 millis 
... many deleted ... 
Wed Oct 02 16:32:09 CEST 2013: 82 - Slept for 6 millis 
Wed Oct 02 16:32:09 CEST 2013: 83 - Slept for 1636 millis 
Wed Oct 02 16:32:10 CEST 2013: 84 - Slept for 44 millis 
Wed Oct 02 16:32:10 CEST 2013: 85 - Slept for 1 millis 

你可以看到,虽然一些任务需要比别人更长的时间,输出的顺序是任务是顺序添加。您也可以看到它正在同一秒处理许多任务(同时)

+0

什么是最好的数据结构实现保持结果?它应该是某种优先级队列吗?仍然不知道如何执行发布命令? –

+0

@NikolayKuznetsov查看我给出的更新示例。 –

+1

非常感谢!所以为了实现发布顺序提交给'result'池,然后调用阻止调用的'Future.get',对吧? –

1

或者,您可以将期货排队,而不是将后处理安排到单个线程执行程序。逻辑非常相似;单线程执行程序也在内部使用队列,但主要区别在于处理结果对象的方式。使用队列允许最终处理阶段的循环(即类似于AWT事件处理工作)。这取决于围绕这个部分的应用哪种方式更好。

import java.util.Random; 
import java.util.concurrent.*; 

public class InOrder 
{ 
    private static final Random rand = new Random(); 

    final static class Event implements Callable<Result> { 
    final int id; 

    public Event (int id) { 
     this.id = id; 
    } 
    public Result call() throws InterruptedException { 
     // arbitrary long computation 
     Thread.sleep(rand.nextInt(10000)); 
     return new Result(id); 
    } 
    } 
    final static class Result { 
    int id; 

    public Result(int id) { 
     this.id = id; 
    } 
    } 
    static final int STOP_ID = -1; 
    private static final ExecutorService POOL = Executors.newFixedThreadPool(10); 
    private static final BlockingQueue<Future<Result>> QUEUE = new ArrayBlockingQueue<>(10); 

    static void processResults() throws InterruptedException, ExecutionException { 
    for(;;) { 
     Result r=QUEUE.take().get(); 
     if(r.id==STOP_ID) return; 
     System.out.println("received result id="+r.id); 
    } 
    } 
    public static void main(String[] args) 
    { 
    POOL.submit(new Callable<Object>() { 
     public Object call() throws Exception { 
     processResults(); 
     return null; 
     } 
    }); 

    for(int id=0; id<100; id++) try { 
     QUEUE.put(POOL.submit(new Event(id))); 
    } catch(InterruptedException ex) { break; } 

    try { QUEUE.put(new EndMarker()); } 
    catch(InterruptedException ex) {} 

    POOL.shutdown(); 
    } 
    static final class EndMarker implements Future<Result> { 
    public boolean cancel(boolean mayInterruptIfRunning) { 
     return false; 
    } 
    public boolean isCancelled() { 
     return false; 
    } 
    public boolean isDone() { 
     return true; 
    } 
    public Result get() { 
     return new Result(STOP_ID); 
    } 
    public Result get(long timeout, TimeUnit unit) { 
     return get(); 
    } 
    } 
}