2016-05-29 38 views
0

我需要从SomeObjectHandler实现handle()方法,该方法将SomeObject委托给某个外部系统(见下文)。具有正确的hashCode和equals方法的SomeObject。方法句柄(SomeObject someObject)可以从多个线程中调用(例如10)。外部系统可以同时操作不等于someObject,但是如果系统试图同时使用相同的someObject操作,它会中断。我需要实现这个类来防止同等处理someObject。即使一些someObject是相等的,它们都应该被处理。发送来自Java中多线程的消息

现在,我想我需要从并发库中使用类似队列的东西,但我不知道是哪一个。

UPD:我只需要使用标准的Java库。如果有可能达到最大吞吐量。

回答

0

我不是100%确定,如果我完全得到你的问题,但我认为有多种方法可以解决这个问题。

1)如前所述,您可以使用队列插入对象,并确保外部系统以同步方式处理对象,因为如您所说,外部系统无法同时处理相同的对象。

2)在发件人代码本身处理。我已经多次尝试过。这是一段代码片段。这种方法的好处是,它只能同步相同的对象。只要确保在最终块中处理移除部分。对不起,如果代码不整洁。我是新的:)

Map<SomeObject,Integer> objMap=new ConcurrentHashMap<SomeObject,Integer>(); 

public void handle(SomeObject someObject) { 
synchronized(this.class) 
{ 
Integer count=objMap.get(someObject); 
if(count==null) 
{ 
    count=0; 
} 
objMap.put(someObject,++count); 
} 

synchronized(objectMap.get(someObject) 
{ 
    outerSystem.process(someObject); 

    Integer count=objMap.get(someObject); 
    if(count>1) 
{ 
    objMap.put(someObject,--count); 
} 
else 
{ 
    objectMap.remove(someObject); 
} 
} 

} 
+0

谢谢你的尝试,像这样的东西可能是工作。但我认为它有更优化的解决方案。这将是很好的实现最大吞吐量。如果我们会对每个动作进行同步/锁定,可能会大大降低吞吐量 – AskProgram

0

RxJava可以帮助这里。它非常擅长处理特别涉及异步转换的数据流,并且在需要时可以在封面下讨论排队(不会通过同步修饰符进行阻塞!)。为了解决你的问题,我会做这样的事情:

public class SomeHandler{ 

    private final OuterSystem outerSystem; 

    private final PublishSubject<SomeObject> subject; 

    public SomeHandler() { 
     subject 
      // handle calls from multiple threads (non-blocking) 
      .serialized() 
      // buffer in memory if not keeping up  
      .onBackpressureBuffer() 
      // use equals/hashCode to order messages (the queues you referred to) 
      .groupBy(x -> x) 
      .flatMap(g -> 
       g.doOnNext(x -> outerSystem.process(x)) 
       // process groups in parallel 
       .subscribeOn(Schedulers.computation())) 
      // do something if an error occurs 
      .doOnError(e -> e.printStackTrace()) 
      // start consuming data when arrives 
      .subscribe(); 
    } 

    public void handle(SomeObject someObject) { 
     subject.doOnNext(someObject); 
    } 
} 
+0

谢谢您的回答,它确实可行。但我只需要使用标准的Java库(这是我的错误,我不得不写它) – AskProgram

0

如果我正确理解你的问题,你需要保证“相等的对象”的串行执行,而“不相等的对象”可以在parallell进行处理。实现这一目标的一种方法是安排N个处理器,并根据对象的某些确定性特征来分散工作负载。

在你的情况,如果两个对象是相等的,它们的散列码必须相等,所以hasCode() modulo N可用于在N个执行人,其中每个执行人只包含单线程分散负载:

public class SomeHandler { 
    static int N = ...; 
    // Each executor is an Executors.newSingleThreadScheduledExecutor() 
    Executor[N] executors = ....; 
    OuterSystem system; 

    public void handle(SomeObject so) { 
     executors[so.hashCode() % N].execute(() -> system.process(so)); 
    } 
} 
+0

好方法。 )应该是顺便说一句,因为OP说'handle'是从多个线程中调用的,所以我会把'SomeHandler'中的所有字段放在最后。 –