2016-04-22 30 views
1

我想创建一个信号量来防止某个方法一次执行超过1x。如何释放信号并让任何线程继续?

如果任何其他线程请求访问,应该等到信号被释放:

private Map<String, Semaphore> map; 

public void test() { 
    String hash; //prevent to run the long running method with the same hash concurrently 
    if (map.contains(hash)) { 
     map.get(hash).aquire(); //wait for release of the lock 
     callLongRunningMethod(); 
    } else { 
     Semaphore s = new Semaphore(1); 
     map.put(hash, s); 
     callLongRunningMethod(); 
     s.release(); //any number of registered threads should continue 
     map.remove(hash); 
    } 
} 

问题:我如何可以锁定只有一个线程的信号,但其释放,使任何数线程可以在发布后立即继续?

一些澄清:

想象的长时间运行的方法是事务性的方法。查看数据库。如果找不到任何条目,则会发送沉重的XML请求并将其保存到db。另外,也许可能会触发进一步的异步处理,因为这应该是数据的“初始获取”。然后从DB返回对象(在该方法内)。如果DB条目已经存在,它将直接返回实体。

现在,如果多个线程同时访问长时间运行的方法,所有方法都会获取沉重的XML(流量,性能),并且它们都会尝试将相同的对象保存到数据库中(因为长时间运行方法是事务性的)。导致例如非唯一例外。加上所有这些触发可选的异步线程。

当只有一个线程被锁定时,只有第一个线程负责持久化该对象。然后,完成后,所有其他线程都会检测到该条目已经存在于数据库中,并只提供该对象。

+2

为什么不把'callLongRunningMethod'放在'synchronized(this){...}'块中?或'同步(哈希)'? (或者,如果它不是私有的,并且/或者从除此方法以外的其他地方调用,则使'调用LongRunningMethod本身'同步') –

+0

“散列”应该来自哪里,它的意图是什么? – zapl

+0

因为该方法可能正在同时运行*如果散列不同。我可能只是没有相同的散列值。 – membersound

回答

0

据我了解你的需要,你希望能够确保任务是第一次由一个单线程执行,那么你想允许多个线程执行它,如果你需要依靠CountDownLatch作为下一个:

下面是如何将其与CountDownLatch实现:

private final ConcurrentMap<String, CountDownLatch> map = new ConcurrentHashMap<>(); 

public void test(String hash) { 
    final CountDownLatch latch = new CountDownLatch(1); 
    final CountDownLatch previous = map.putIfAbsent(hash, latch); 
    if (previous == null) { 
     try { 
      callLongRunningMethod(); 
     } finally { 
      map.remove(hash, latch); 
      latch.countDown(); 
     } 
    } else { 
     try { 
      previous.await(); 
      callLongRunningMethod(); 
     } catch (InterruptedException e) { 
      Thread.currentThread().interrupt(); 
     } 
    } 
} 
+2

'Semaphore s = map.computeIfAbsent(hash,k - > new Semaphore(1));'在Java 8中更容易+。 –

+0

不会's.release()'返回锁的计数(这里是'1')?我想允许任何等待的线程继续,而不仅仅是1. – membersound

+0

@AndyTurner好的一个谢谢你,我更新了回答 –

0

我认为你可以通过使用非常高的permit数字(高于线程数量,例如2000000)来达到这个目的。

然后在函数中应该只能运行acquire完整的许可证号(acquire(2000000))而在其他线程中您只有一个许可证acquire

2

据我所知,在这里你不需要使用Semaphore。相反,你应该使用ReentrantReadWriteLock。此外,test方法不是线程安全的。

下面的示例是你的逻辑使用RWL

private ConcurrentMap<String, ReadWriteLock> map = null; 

void test() { 
    String hash = null; 
    ReadWriteLock rwl = new ReentrantReadWriteLock(false); 
    ReadWriteLock lock = map.putIfAbsent(hash, rwl); 

    if (lock == null) { 
     lock = rwl; 
    } 

    if (lock.writeLock().tryLock()) { 
     try { 
      compute(); 
      map.remove(hash); 
     } finally { 
      lock.writeLock().unlock(); 
     } 
    } else { 
     lock.readLock().lock(); 
     try { 
      compute(); 
     } finally { 
      lock.readLock().unlock(); 
     } 
    } 
} 

在这段代码的实现,第一个成功的线程将收购WriteLock而其他Thread s就等待写锁的释放。在发布WriteLock之后,等待发布的所有Thread将同时进行。

+0

注意:您的方法不是线程安全的,因为调用'map.get'和'map.put'的顺序不是原子的。 –

+0

yeap,你是对的 – hahn

+0

我怎么能让+加线程安全? – membersound

-1

我最终使用CountDownLatch如下:

private final ConcurrentMap<String, CountDownLatch> map = new ConcurrentHashMap<>(); 

public void run() { 
     boolean active = false; 
     CountDownLatch count = null; 

     try { 
      if (map.containsKey(hash)) { 
       count = map.get(hash); 
       count.await(60, TimeUnit.SECONDS); //wait for release or timeout 
      } else { 
       count = new CountDownLatch(1); 
       map.put(hash, count); //block any threads with same hash 
       active = true; 
      } 

      return runLongRunningTask(); 
     } finally { 
      if (active) { 
       count.countDown(); //release 
       map.remove(hash, count); 
      } 
     } 
} 
+1

构建的弱值,最终会绕过#containsKey并将CDL放入地图。这个块不是原子的。因此,你最终会多次调用#runLongRunningTask() – hahn

+0

好吧,你必须将这个调用包装在'synchronized'中吗? – membersound

+0

看看我的还是Andy Turner的回答。他们都解决了这个问题。选择最符合您需求的答案。 – hahn

0

我认为,最简单的方法,这将使用一个ExecutorServiceFuture做:

class ContainingClass { 
    private final ConcurrentHashMap<String, Future<?>> pending = 
    new ConcurrentHashMap<>(); 
    private final ExecutorService executor; 

    ContainingClass(ExecutorService executor) { 
    this.executor = executor; 
    } 

    void test(String hash) { 
    Future<?> future = pending.computeIfAbsent(
     hash, 
     () -> executor.submit(() -> longRunningMethod())); 

    // Exception handling omitted for clarity. 
    try { 
     future.get(); // Block until LRM has finished. 
    } finally { 
     // Always remove: in case of exception, this allows 
     // the value to be computed again. 
     pending.values().remove(future); 
    } 
    } 
} 

Ideone Demo

去除值未来是线程安全的,因为computeIfAbsentremove是原子的:要么computeIfAbsentremove之前运行,在这种情况下,返回现有的将来,并立即完成;或者之后运行,并添加新的未来,导致新的呼叫longRunningMethod

注意,它从pending.values()消除了未来,而不是从pending直接:请看下面的例子:

  • 线程1和线程2同时运行
  • 线程1次完成,并删除该值。
  • 线程3运行,为地图添加新的未来
  • 线程2完成并尝试删除该值。

如果未来通过键从地图中删除,线程2将删除线程3的未来,这是与线程2未来不同的实例。

这简化了longRunningMethod过,因为它不再需要做了“检查,如果我需要做什么”为阻塞的线程:该Future.get()已在阻塞线程成功完成足以表明,没有额外的工作是需要的。

+0

我认为这不符合要求 – hahn

+0

请注意我提到的有关在阻塞线程完成后实际上不需要在阻塞的线程中运行LRM。 –