2017-03-09 108 views
1

我正在研究一个库,它将采用一个对象DataRequest作为输入参数并基于该对象构建一个URL,然后调用我们的应用程序服务器使用apache http客户端,然后将响应返回给使用我们库的客户。有些客户会拨打executeSync方法获得相同的功能,一些客户会拨打我们的executeAsync方法来获取数据。在多线程环境中并行执行每个子任务

  • executeSync() - 等待,直到我有一个结果,返回结果。
  • executeAsync() - 返回一个Future,如果需要,可以在其他事情完成后立即处理。

下面是我DataClient类具有以上两种方法:

public class DataClient implements Client { 
    private final ForkJoinPool forkJoinPool = new ForkJoinPool(16); 
    private CloseableHttpClient httpClientBuilder; 

    // initializing httpclient only once 
    public DataClient() { 
    try { 
     RequestConfig requestConfig = 
      RequestConfig.custom().setConnectionRequestTimeout(500).setConnectTimeout(500) 
       .setSocketTimeout(500).setStaleConnectionCheckEnabled(false).build(); 
     SocketConfig socketConfig = 
      SocketConfig.custom().setSoKeepAlive(true).setTcpNoDelay(true).build(); 

     PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = 
      new PoolingHttpClientConnectionManager(); 
     poolingHttpClientConnectionManager.setMaxTotal(300); 
     poolingHttpClientConnectionManager.setDefaultMaxPerRoute(200); 

     httpClientBuilder = 
      HttpClientBuilder.create().setConnectionManager(poolingHttpClientConnectionManager) 
       .setDefaultRequestConfig(requestConfig).setDefaultSocketConfig(socketConfig).build(); 
    } catch (Exception ex) { 
     // log error 
    } 
    } 

    @Override 
    public List<DataResponse> executeSync(DataRequest key) { 
    List<DataResponse> responsList = null; 
    Future<List<DataResponse>> responseFuture = null; 

    try { 
     responseFuture = executeAsync(key); 
     responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit()); 
    } catch (TimeoutException | ExecutionException | InterruptedException ex) { 
     responsList = 
      Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, 
       DataStatusEnum.ERROR)); 
     responseFuture.cancel(true); 
     // logging exception here 
    } 
    return responsList; 
    } 

    @Override 
    public Future<List<DataResponse>> executeAsync(DataRequest key) { 
    DataFetcherTask task = new DataFetcherTask(key, this.httpClientBuilder); 
    return this.forkJoinPool.submit(task); 
    } 
} 

下面是我DataFetcherTask类也有一个静态类DataRequestTask它通过使URL调用我们的应用服务器:

public class DataFetcherTask extends RecursiveTask<List<DataResponse>> { 
    private final DataRequest key; 
    private final CloseableHttpClient httpClientBuilder; 

    public DataFetcherTask(DataRequest key, CloseableHttpClient httpClientBuilder) { 
    this.key = key; 
    this.httpClientBuilder = httpClientBuilder; 
    } 

    @Override 
    protected List<DataResponse> compute() { 
    // Create subtasks for the key and invoke them 
    List<DataRequestTask> requestTasks = requestTasks(generateKeys()); 
    invokeAll(requestTasks); 

    // All tasks are finished if invokeAll() returns. 
    List<DataResponse> responseList = new ArrayList<>(requestTasks.size()); 
    for (DataRequestTask task : requestTasks) { 
     try { 
     responseList.add(task.get()); 
     } catch (InterruptedException | ExecutionException e) { 
     Thread.currentThread().interrupt(); 
     return Collections.emptyList(); 
     } 
    } 
    return responseList; 
    } 

    private List<DataRequestTask> requestTasks(List<DataRequest> keys) { 
    List<DataRequestTask> tasks = new ArrayList<>(keys.size()); 
    for (DataRequest key : keys) { 
     tasks.add(new DataRequestTask(key)); 
    } 
    return tasks; 
    } 

    // In this method I am making a HTTP call to another service 
    // and then I will make List<DataRequest> accordingly. 
    private List<DataRequest> generateKeys() { 
    List<DataRequest> keys = new ArrayList<>(); 
    // use key object which is passed in contructor to make HTTP call to another service 
    // and then make List of DataRequest object and return keys. 
    return keys; 
    } 

    /** Inner class for the subtasks. */ 
    private static class DataRequestTask extends RecursiveTask<DataResponse> { 
    private final DataRequest request; 

    public DataRequestTask(DataRequest request) { 
     this.request = request; 
    } 

    @Override 
    protected DataResponse compute() { 
     return performDataRequest(this.request); 
    } 

    private DataResponse performDataRequest(DataRequest key) { 
     MappingHolder mappings = DataMapping.getMappings(key.getType()); 
     List<String> hostnames = mappings.getAllHostnames(key); 

     for (String hostname : hostnames) { 
     String url = generateUrl(hostname); 
     HttpGet httpGet = new HttpGet(url); 
     httpGet.setConfig(generateRequestConfig()); 
     httpGet.addHeader(key.getHeader()); 

     try (CloseableHttpResponse response = httpClientBuilder.execute(httpGet)) { 
      HttpEntity entity = response.getEntity(); 
      String responseBody = 
       TestUtils.isEmpty(entity) ? null : IOUtils.toString(entity.getContent(), 
        StandardCharsets.UTF_8); 

      return new DataResponse(responseBody, DataErrorEnum.OK, DataStatusEnum.OK); 
     } catch (IOException ex) { 
      // log error 
     } 
     } 
     return new DataResponse(DataErrorEnum.SERVERS_DOWN, DataStatusEnum.ERROR); 
    } 
    } 
} 

对于每个DataRequest对象,都有一个DataResponse对象。现在有人通过传递DataRequest对象调用我们的库,在内部我们制作List<DataRequest>对象,然后我们并行调用每个对象DataRequest,并返回List<DataResponse>,其中列表中的每个DataResponse对象都会响应对应的对象DataRequest对象。

下面是流量:

  • 客户将通过传递DataRequest对象调用DataClient类。他们可以根据他们的要求调用executeSync()executeAsync()方法。
  • 现在在DataFetcherTask类(这是ForkJoinTask's亚型一个RecursiveTask一个),给定key对象,它是一个单一的DataRequest,我会产生List<DataRequest>,然后调用每个子任务并行为列表中的每个对象DataRequest。这些子任务与父任务在相同的ForkJoinPool中执行。
  • 现在在DataRequestTask类中,我通过创建URL并将其DataResponse对象返回来执行每个DataRequest对象。

问题陈述:

因为这个库被称为一个非常高的吞吐量的环境,因此必须非常快。对于同步调用,在单独的线程中执行可以吗?这将导致线程的额外成本和资源以及线程的上下文切换成本,所以我有点混淆。此外,我在这里使用ForkJoinPool这将节省我使用额外的线程池,但它是在这里正确的选择?

有没有更好的和有效的方式来做同样的事情,也可以提高性能?我使用的是Java 7,并且可以访问Guava库,所以如果它可以简化任何事情,那么我也可以开放它。

看起来我们在负载很重时会看到一些争用。有没有什么办法可以使这个代码在非常重的负载下运行时进入线程争用?

+0

听起来像[ThreadPool](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html)会很有用,但请记住,过早优化是所有邪恶的来源 –

+0

@ScaryWombat同意,这就是为什么我会做负载测试,但问题是我使用ForkJoinPool这也是ThreadPool的专用形式是合理的。然后,我使用executeSync方法的方式是否正确? – john

+0

你看到了什么样的争用?也许是重负载新的ForkJoinPool(16);'是不够的,尝试增加'16'到一个更大的值 – Teg

回答

0

我认为在您的情况下,最好使用异步http调用,请参阅链接:HttpAsyncClient。而且你不需要使用线程池。

在executeAsync方法创建空CompletableFuture <DataResponse>(),并把它传递给客户打电话,在回拨电话有通过调用完成它(或completeExceptionally如果异常募集)设置completableFuture的结果。 ExecuteSync方法实现看起来不错。

编辑:

对于Java 7它是只需要更换一个completableFuture承诺番石榴或实施,像ListenableFuture任何类似

0

使用ForkJoinPool是正确的选择,其设计效率与许多小任务:

一个ForkJoinPool不同于其他类型的ExecutorService主要依靠雇用偷工减料:池中的所有线程试图找到一个d执行提交给池的任务和/或由其他活动任务创建的任务(如果不存在,最终会阻止等待工作)。这可以在大多数任务产生其他子任务时(如同大多数ForkJoinTasks一样)以及从外部客户端将许多小任务提交给池时实现高效处理。特别是在构造函数中将asyncMode设置为true时,ForkJoinPools可能适合用于永不连接的事件样式任务。

我建议尝试在构造函数中asyncMode = true因为你的情况的任务从未加入:

public class DataClient implements Client { 
    private final ForkJoinPool forkJoinPool = new ForkJoinPool(16, ForkJoinPool.ForkJoinWorkerThreadFactory, null, true); 
... 
} 

对于executeSync()可以使用forkJoinPool.invoke(task),这是做了同步的管理方式在游泳池任务执行的资源优化:

@Override 
public List<DataResponse> executeSync(DataRequest key) { 
    DataFetcherTask task = new DataFetcherTask(key, this.httpClientBuilder); 
    return this.forkJoinPool.invoke(task); 
} 

如果你可以使用Java 8再有就是已经优化公共池:ForkJoinPool.commonPool()

+0

你能给我一个例子,我应该使用'asyncMode = true'吗?另外我的'executeSync()'方法是怎么样的?对此有点混淆。 – john

+0

我在回答中添加了一些示例 – Teg

+0

因此,这意味着我不需要在'executeSync'内调用'executeAsync'方法并按照您的建议进行操作。如果是,那么超时现在如何进入画面?我的意思是同步如果调用超时,然后我返回超时响应。这将如何工作? – john