2017-08-29 51 views
-1

我尝试使用 java-driver-async-queries执行异步查询。我正在修改FutureCallback中的列表,但似乎不起作用 -使用ResultSetFuture修改集合

List<Product> products = new ArrayList<Product>(); 

for (// iterating over a Map) { 
    key = entry.getKey(); 
    String query = "SELECT id,desc,category FROM products where id=?"; 
    ResultSetFuture future = session.executeAsync(query, key); 
    Futures.addCallback(future, 
     new FutureCallback<ResultSet>() { 
      @Override public void onSuccess(ResultSet result) { 
       Row row = result.one(); 
       if (row != null) { 
        Product product = new Product(); 
        product.setId(row.getString("id")); 
        product.setDesc(row.getString("desc")); 
        product.setCategory(row.getString("category")); 

        products.add(product); 
       } 
      } 

      @Override public void onFailure(Throwable t) { 
       // log error 
      } 
     }, 
     MoreExecutors.sameThreadExecutor() 
    ); 
} 

System.out.println("Product List : " + products); // Not printing correct values. Sometimes print blank 

是否有其他方法?

基于米哈伊尔Baksheev答案我执行,现在得到正确的结果。 只是一个转折点。我需要实现一些额外的逻辑。我想知道如果我可以使用List<MyClass>代替List<ResultSetFuture>和MyClass的作为 -

public class MyClass { 

    private Integer   productCount; 
    private Integer   stockCount; 
    private ResultSetFuture result; 
} 

然后同时迭代设置FutureList为 -

ResultSetFuture result = session.executeAsync(query, key.get()); 
MyClass allResult = new MyClass(); 
allResult.setInCount(inCount); 
allResult.setResult(result); 
allResult.setSohCount(values.size() - inCount); 

futuresList.add(allResult); 
+0

什么是“不工作”的定义是什么?发布预期行为和实际行为。 –

+1

你是不是在等你的未来?看起来你创建了一堆期货,并立即打印结果结构。结果结构尚未填充,因为大部分期货仍在运行。 – RussS

+0

谢谢。什么是修正。有没有我可以参考的代码示例? – Saurabh

回答

1

正如@RussS提到,是不是等待所有期货交易代码完成。

有许多方法可以同步异步代码。例如,使用CountDownLatch

编辑: 也请使用separte线程回调和使用并发收集产品。

ConcurrentLinkedQueue<Product> products = new ConcurrentLinkedQueue<Product>(); 
final Executor callbackExecutor = Executors.newSingleThreadExecutor(); 
final CountDownLatch doneSignal = new CountDownLatch(/*the Map size*/); 
for (// iterating over a Map) { 
    key = entry.getKey(); 
    String query = "SELECT id,desc,category FROM products where id=?"; 
    ResultSetFuture future = session.executeAsync(query, key); 
    Futures.addCallback(future, 
     new FutureCallback<ResultSet>() { 
      @Override public void onSuccess(ResultSet result) { 
       Row row = result.one(); 
       if (row != null) { 
        Product product = new Product(); 
        product.setId(row.getString("id")); 
        product.setDesc(row.getString("desc")); 
        product.setCategory(row.getString("category")); 

        products.add(product); 
       } 
       doneSignal.countDown(); 

      } 

      @Override public void onFailure(Throwable t) { 
       // log error 
       doneSignal.countDown(); 
      } 
     }, 
     callbackExecutor 
    ); 
} 

doneSignal.await();   // wait for all async requests to finish 
System.out.println("Product List : " + products); 

另一种方法是收集所有期货清单,然后等待所有结果作为一个未来番石榴的Futures.allAsList,如:

List<ResultSetFuture> futuresList = new ArrayList<>(/*Map size*/); 
     for (/* iterating over a Map*/) { 
      key = entry.getKey(); 
      String query = "SELECT id,desc,category FROM products where id=?"; 
      futuresList.add(session.executeAsync(query, key)); 
     } 

     ListenableFuture<List<ResultSet>> allFuturesResult = Futures.allAsList(futuresList); 
     List<Product> products = new ArrayList<>(); 
     try { 
      final List<ResultSet> resultSets = allFuturesResult.get(); 
      for (ResultSet rs : resultSets) { 
       if (null != rs) { 
        Row row = rs.one(); 
        if (row != null) { 
         Product product = new Product(); 
         product.setId(row.getString("id")); 
         product.setDesc(row.getString("desc")); 
         product.setCategory(row.getString("category")); 

         products.add(product); 
        } 
       } 
      } 
     } catch (InterruptedException | ExecutionException e) { 
      System.out.println(e); 
     } 
     System.out.println("Product List : " + products); 

EDIT 2

我想知道如果我可以使用List而不是List和MyClass作为

技术上是可以的,但你不能在这种情况下,通过在Futures.allAsListList<MyClass>MyClass应该实现ListenableFuture接口

+0

谢谢。我有一个问题在cassandra中做嵌套查询,并按照您的回复https://stackoverflow.com/questions/45471519/improve-performance-in-cassandra-and-java-collections。现在变得间歇性的问题,因为有时我的产品列表变空了。但是,当我在onSuccess方法中打印列表时,它给了我结果。如果我使用await(),那么方法花费很长时间。请建议。 – Saurabh

+0

@Saurabh,我已经提出了我的回答 –

+0

我编辑了我的问题。请看一看。 – Saurabh