2012-12-19 55 views
1

爪哇7,Glassfish的3.1.2Anynchronous多线程消息处理

输入像信息:

public class Message { 

    private final String contextId; 
    private final String name; 
    ... 
} 

此消息应该由工人进行处理。对于具有新contextId 的消息,应该启动一个新线程。对于已存在的contextId,使用已存在的线程。已经存在的线程应该使用相同的contextId顺序来处理消息。

Hier我的最后,不工作,工人版本。

@Stateless 
@LocalBean 
public class Worker { 

private static final Map<String, Future<Result>> MAP = new ConcurrentHashMap<>(); 
@EJB 
private Worker worker; 

@Asynchronous 
public void work(Message message) { 
    System.out.println(Thread.currentThread().getName() + ": A message: " + message.toString()+ " should be processed"); 
    Future<Result> sameContext = MAP.get(message.getContextId()); 
    if (sameContext != null) { 
     waitForSameContextId(message, sameContext); 
    } 
    MAP.put(message.getContextId(), worker.doWork(message)); 
} 

@Asynchronous 
public Future<Result> doWork(Message message) { 
    System.out.println(Thread.currentThread().getName() + ": Processing the message: " + message.toString()); 

    AsyncResult<Result> asyncResult = new AsyncResult<>(new Result()); 
    try { 
     Thread.sleep(15000); 
    } catch (InterruptedException ex) { 
     ex.printStackTrace(); 
    } 

    MAP.remove(message.getContextId()); //We are done removing 
    System.out.println(Thread.currentThread().getName() + ": The message: " + message.toString()+ " was processed"); 
    return asyncResult; 
} 

private void waitForSameContextId(Message message, Future<Result> result) { 
    try { 
     System.out.println(Thread.currentThread().getName() + ": message with id: " + message.toString() 
       + " is already in work, blocking Thread until it is finished"); 
     Result get = result.get(); //blocks thread 
    } catch (InterruptedException | ExecutionException ex) { 
     ex.printStackTrace(); 
     // Do some failure management 
    } 
} 

测试类:

public class MessageReceiver { 

private static String ID = "#########"; 

@EJB 
private Worker worker; 

public void receive(Message message) { 

    worker.work(message); 
} 

@PostConstruct 
void init() { 

    receive(new Message(ID, "message 1")); 
    receive(new Message(ID, "message 2")); 
    receive(new Message(ID, "message 3")); 
... 
} 

回答