2010-02-22 96 views
17

我有一个Java线程像下面这样:返回值从Java线程

public class MyThread extends Thread { 
     MyService service; 
     String id; 
     public MyThread(String id) { 
      this.id = node; 
     } 
     public void run() { 
      User user = service.getUser(id) 
     } 
    } 

我有大约300 IDS,每几秒钟 - 我火了线程,使每个ID的呼叫。例如。现在

for(String id: ids) { 
    MyThread thread = new MyThread(id); 
    thread.start(); 
} 

,我想从每个线程收集结果,并做了批量插入到数据库中,而不是使300数据库插入每2秒。

任何想法我可以做到这一点?

回答

17

如果你想要做的数据库更新前收集所有的结果,你可以使用invokeAll方法。如果您一次提交一个任务,这将照顾到需要的簿记,如daveb暗示。

private static final ExecutorService workers = Executors.newCachedThreadPool(); 

... 

Collection<Callable<User>> tasks = new ArrayList<Callable<User>>(); 
for (final String id : ids) { 
    tasks.add(new Callable<User>() 
    { 

    public User call() 
     throws Exception 
    { 
     return svc.getUser(id); 
    } 

    }); 
} 
/* invokeAll blocks until all service requests complete, 
* or a max of 10 seconds. */ 
List<Future<User>> results = workers.invokeAll(tasks, 10, TimeUnit.SECONDS); 
for (Future<User> f : results) { 
    User user = f.get(); 
    /* Add user to batch update. */ 
    ... 
} 
/* Commit batch. */ 
... 
+0

就我而言,某些服务调用可能永远不会返回或返回时间太长。所以'invokeAll'和'awaitTermination(长时间超时)'看起来像是要走的路。因此我接受了这个答案。 希望我也能接受@ daveb的回答。 – Langali 2010-02-22 22:11:08

+0

在这种情况下,您可以使用带有超时参数的'invokeAll'的重载版本。我会更新我的答案以显示如何。 – erickson 2010-02-22 22:15:27

+0

谢谢。我不知何故忽略了它。 – Langali 2010-02-22 22:18:34

34

的典型方法是使用一个CallableExecutorServicesubmitCallableExecutorService返回(类型安全)Future从中可以get结果。

class TaskAsCallable implements Callable<Result> { 
    @Override 
    public Result call() { 
     return a new Result() // this is where the work is done. 
    } 
} 

ExecutorService executor = Executors.newFixedThreadPool(300); 
Future<Result> task = executor.submit(new TaskAsCallable()); 
Result result = task.get(); // this blocks until result is ready 

在你的情况,你可能想为你添加任务执行人使用invokeAll返回的Futures一个List,或创建一个列表你自己。要收集结果,只需致电get

+0

好后,一为细节。 – fastcodejava 2010-02-22 21:56:51

1

您需要将结果存储为类似单例的结果。这必须正确同步。

编辑:我知道这不是最好的建议,因为处理生Threads不是个好主意。但考虑到这个问题会起作用,不是吗?我可能没有投票,但为什么投票呢?

1

你可以创建你传递给你创建的线程队列或列表,线程的结果添加到它获取由消费者执行的批量插入清空列表。

1

最简单的方法是将对象传递给每个线程(每个线程一个对象)以后将包含结果。主线程应该保留对每个结果对象的引用。当所有线程连接在一起时,您可以使用结果。

1
public class TopClass { 
    List<User> users = new ArrayList<User>(); 
    void addUser(User user) { 
     synchronized(users) { 
      users.add(user); 
     } 
    } 
    void store() throws SQLException { 
     //storing code goes here 
    } 
    class MyThread extends Thread { 
      MyService service; 
      String id; 
      public MyThread(String id) { 
       this.id = node; 
      } 
      public void run() { 
       User user = service.getUser(id) 
       addUser(user); 
      } 
     } 
} 
1

您可以创建一个扩展Observable的类。然后你的线程可以调用Observable类中的方法,通过调用Observable.notifyObservers(Object)来通知在该观察者中注册的任何类。

观测类将执行观测,并与可观察的注册。然后,您将实现一个更新(Observable,Object)方法,当Observerable.notifyObservers(Object)被调用时被调用。

4

将结果存储在您的对象中。当它完成后,让它自己进入同步集合(想到同步队列)。

当你想收集你的结果提交,抓住一切从队列中,并从对象看你的结果。你甚至可以让每个对象知道如何将自己的结果“发布”到数据库中,这样就可以提交不同的类,并且可以使用完全相同的微小优雅循环来处理所有类。

JDK中有很多工具可以帮助解决这个问题,但是一旦开始将线程看作是一个真正的对象,而不是围绕“运行”方法的一堆废话,这真的很容易。一旦你开始思考对象,这种编程变得更简单和更令人满意。

+0

+1。我最初的想法很相似,但Java 5现在变得更简单了! – Langali 2010-02-22 22:41:07

+0

+1。那好美丽。 – 2010-02-22 22:53:09

+0

这是针对一个B类地址范围的系统做的。它非常优雅,从几小时减少到几分钟。 (好吧,从Ping也变成了一个SNMP get,Java并不真正支持Ping) – 2010-02-23 00:55:12

2

在Java8中,使用CompletableFuture有更好的方法。假设我们有类从数据库中获取的ID,为简单起见,我们可以只返回下面的数字,

static class GenerateNumber implements Supplier<Integer>{ 

    private final int number; 

    GenerateNumber(int number){ 
     this.number = number; 
    } 
    @Override 
    public Integer get() { 
     try { 
      TimeUnit.SECONDS.sleep(1); 
     }catch (InterruptedException e){ 
      e.printStackTrace(); 
     } 
     return this.number; 
    } 
} 

现在我们可以将结果添加到并发收集一次未来的结果是准备好了。

Collection<Integer> results = new ConcurrentLinkedQueue<>(); 
int tasks = 10; 
CompletableFuture<?>[] allFutures = new CompletableFuture[tasks]; 
for (int i = 0; i < tasks; i++) { 
    int temp = i; 
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()-> new GenerateNumber(temp).get(), executor); 
    allFutures[i] = future.thenAccept(results::add); 
} 

现在,我们可以添加一个回调时,所有的期货准备就绪,

CompletableFuture.allOf(allFutures).thenAccept(c->{ 
    System.out.println(results); // do something with result 
});