2017-04-13 32 views
1

我们使用操作链Java8流处理事件的流。作为处理的一部分,我们希望跟踪事件的数量及其状态以进行测试和监控。以下是我们的用例的简化示例,可以打印给定日期流的星期几。如何处理Java8并行流中的状态变量?

public class StreamStateHandling { 

    private static enum Status {RECEIVED, SUCCESS, ERROR}; 

    private Map<Status,Integer> results = new EnumMap<>(Status.class); 

    private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("MM/dd/yyyy"); 


    private static Optional<LocalDate> parseDate(String dateString){ 
    LocalDate localDate = null; 
    try { 
     localDate = LocalDate.from(formatter.parse(dateString)); 
    }catch (DateTimeParseException e){ 
     return Optional.empty(); 
    } 
    return Optional.of(localDate); 
    } 

    private void doWork(){ 
    Stream.of("12/31/2014", 
      "01-01-2015", 
      "12/31/2015", 
      "not a date", 
      "01/01/2016") 
      //.parallel() 
      .peek(v -> addResult(Status.RECEIVED)) 
      .map(StreamStateHandling::parseDate) 
      .peek(v -> {if (!v.isPresent()) addResult(Status.ERROR);}) 
      .filter(Optional::isPresent) 
      .map(Optional::get) 
      .map(DayOfWeek::from) 
      .peek(v -> addResult(Status.SUCCESS)) 
      .forEach(System.out::println); 

     System.out.println(results); 
    } 
    public static void main(String args[]) { 
    new StreamStateHandling().doWork(); 
    } 

    private void addResult(Status status){ 

    int current = results.getOrDefault(status, 0); 
    results.put(status, current + 1); 
    } 
} 

基本上我们正在跟踪地图中的状态计数。这在单线程处理中工作正常,但在并行流中产生非确定性输出。

在现实世界中,我们有几个状态和操作的链条。一般来说,测试流程和跟踪进度的最佳方式是什么?喜欢香草Java8的实现,但确定如果使用开源库更容易。

会感谢你的帮助。

+0

'.peek.foreach'似乎是多余的。你可以将'forEach'变成一个满足两个角色的函数。 – the8472

回答

3

EnumMap是不是线程安全的,也不是在addResult()读 - 修改 - 写逻辑。尝试使用原子ConcurrentHashMap.merge()来增加计数:

private Map<Status, Integer> results = new ConcurrentHashMap<>(); 

private void addResult(Status status) { 
    results.merge(status, 1, Integer::sum); 
} 
+0

不错!我不知道'merge()'函数。在真正的实现中,我必须通过两个维度来跟踪计数,并且我正在使用不支持同步实现的谷歌guava表。一般来说,这是一个好方法吗?我们试图通过使用并发处理来加快速度,但同步跟踪状态可能会使其变慢。 – Raja

+3

@Raja我可以考虑几个选项。 a)使用具有复合键而不是“表”的'ConcurrentHashMap'。 b)用'AtomicInteger'预填充你的表,它可以安全地增加而不锁定。 c)围绕'addResult()'进行同步。正如您所提到的,根据您的流的其余部分发生多少处理,这可能是一个严重的并行化瓶颈。 – shmosel

+1

更一般地,我认为这是值得怀疑您的方案是否是一个很好的使用情况一般流。流应该通常具有有限的副作用并产生单一的总体结果。您正尝试从单个流生成多个结果值。 – shmosel