2014-12-02 95 views
0

我创建了一个简单的搜索引擎:并发管理在Java中

public String search(Map<String, Object> params) { 
    String result = /*search*/; 
    return result; 
} 

此方法执行搜索过程并返回结果(JSON),但有一个很大的问题...... 因为搜索过程是昂贵的,如果一些用户同时使用相同的关键字执行搜索过程,搜索过程将分别为每个执行!

我有两个选择,以避免这种情况:

1​​:

private static final ConcurrentHashMap<Object, ConcurrentLinkedQueue<Locker<Object, String>>> CONCURRENT_SEARCHES = new ConcurrentHashMap<>(); 

public String search(Map<String, Object> params) { 
    Object key = params.get("keyword"); 
    assert key != null; 
    ConcurrentLinkedQueue<Locker<Object, String>> queue = CONCURRENT_SEARCHES.get(key); 
    if (queue != null) { 
     System.out.println("waiting"); 
     Locker<Object, String> locker = new Locker<>(key); 
     queue.add(locker); 
     locker.lock(); 
     String result = locker.getValue(); 
     return result == null ? "[]" : result; 
    } 
    System.out.println("new search"); 
    CONCURRENT_SEARCHES.put(key, (queue = new ConcurrentLinkedQueue<>())); 
    String result = /*search*/; 
    CONCURRENT_SEARCHES.remove(key); 
    Locker<Object, String> locker; 
    while ((locker = queue.poll()) != null) { 
     locker.setValue(result); 
     locker.unlock(); 
    } 
    return result; 
} 
:如你所知,这不会使用共享对象恰好解决这个问题...

2-

这工作真的很好,但又有一个小问题。

测试

public static void main(String[] args) { 
    String id = "1"; 
    ProcessSession session = ProcessManager.openSession(id); 
    //... 
    ExecutorService service = Executors.newCachedThreadPool(); 
    for (int i = 0; i < 10; i++) { 
     service.submit(() -> session.search("keyword")); 
    } 
} 

输出

new search 
waiting 
waiting 
new search 
waiting 
waiting 
waiting 
waiting 
waiting 
waiting 

编辑:

public final class Locker<K, V> { 

    private final Object o = new Object(); 
    private final K key; 
    private V value; 

    public Locker(K key) { 
     this.key = key; 
    } 

    public K getKey() { 
     return key; 
    } 

    public V getValue() { 
     return value; 
    } 

    public void setValue(V value) { 
     this.value = value; 
    } 

    public void lock() { 
     synchronized (o) { 
      try { 
       o.wait(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

    public void lock(long timeout) { 
     synchronized (o) { 
      try { 
       o.wait(timeout); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

    public void unlock() { 
     synchronized (o) { 
      o.notify(); 
     } 
    } 

} 

这是怎么回事错在这里?任何人都可以提出一个更好的解

谢谢。

+0

那么,您缓存搜索,而不是搜索结果。因此,如果没有正在进行的关键字搜索,即使之前进行了类似的搜索,也会启动新的关键字。 – kiheru 2014-12-02 11:39:43

+0

搜索结果不固定,可能由于某些原因而稍后更改... – FaNaJ 2014-12-02 11:42:49

+0

那么测试运行时会出现什么问题?大多数搜索与以前的搜索结合在一起。这不是你想要做的吗? – kiheru 2014-12-02 11:48:16

回答

0

谢谢,问题就迎刃而解了:

private static final Map<Object, List<Locker<Object, String>>> CONCURRENT_SEARCHES = new ConcurrentHashMap<>(); 


public String search(Map<String, Object> params) { 
    Object key = params.get("keyword"); 
    assert key != null; 
    Locker<Object, String> locker = new Locker<>(key); 
    if (isInProgress(key, locker)) { 
     locker.lock(); 
     String result = locker.getValue(); 
     return result == null ? "[]" : result; 
    } 
    String result = /*search*/; 
    finished(key, result); 
    return result; 
} 

private static synchronized boolean isInProgress(Object key, Locker<Object, String> locker) { 
    List<Locker<Object, String>> list = CONCURRENT_SEARCHES.get(key); 
    if (list != null) { 
     list.add(locker); 
     return true; 
    } 
    CONCURRENT_SEARCHES.put(key, Collections.synchronizedList(new ArrayList<>())); 
    return false; 
} 

private static synchronized void finished(Object key, String result) { 
    Optional.of(CONCURRENT_SEARCHES) 
      .map(searches -> searches.remove(key)) 
      .ifPresent(list -> { 
       list.stream().forEach(locker -> { 
        locker.setValue(result); 
        locker.unlock(); 
       }); 
       list.clear(); 
      }); 
} 
0

使用并发收集的东西只会在细粒度的级别上进行锁定,所以你不会获得太多好处。

我会设计搜索有点不同。这里的想法应该是设计你的search方法,使得它的两次调用相互排斥。我想到的一个问题就是在这里使用生产型消费模式。

  • Search方法为每次调用创建一个worker,并且该worker被无限执行器服务执行。
  • 另一种方法是保持工人队列。 search方法创建工作人员并将其添加到队列中,另一个执行人员服务继续选择工作人员表单队列以执行并返回结果。

在所有情况下,这个想法是让搜索操作的实例互相排斥。如果可以的话,也缓存这些东西。

0

除了使用锁和同步块之外,还可以使用标志来获得更好的性能。

发生第一次搜索时,在搜索关键字为关键字的地图中设置一个标记“进行中”。 因此,当重复搜索时,您可以查找此地图,并决定等待或进行长时间轮询以从第一次搜索中获取结果。

一旦搜索完成,缓存结果如果可能并删除标志。

您可以使用套接字编程,使用高级消息传递将结果推送给具有相同搜索关键字的用户。这将节省资源并帮助实现更好的性能。