2014-07-24 53 views
0

我有两个HashMap s表示,我想在一个类中进行同步synchronized块。 我保持两个地图的原因是,我派遣任务到不同的服务器,和我保持在一个HashMap<String, Task>原始任务对象,并在同一时间,我管理的另一个HashMap<String, HashMap<InetAddress, TaskResponse>响应状态。 HashMaps的密钥是String,这对每个Task对象都是唯一的。与Java while循环和Thread.sleep()方法

所以我的代码是这样的:

HashMap<String, Map<InetAddress, TaskResponse>> taskResponseMap = new HashMap<>(); 
HashMap<String, Task> taskMap = new HashMap<>(); 

public void endpointHasRespondedCallback(TaskResponse taskResponse, InetAddress from) { 
    // Callback threads gets blocked here! 
    synchronized(taskResponseMap) { 
     synchronized (taskMap) { 
      Map<InetAddress, TaskResponse> taskResponses = taskResponseMap.get(taskResponse.taskUuid); 
      if (taskResponses == null || !taskResponses.containsKey(from)) { 
       // The response does not exists probably due to timeout 
       return; 
      } 
      taskResponses.put(from, taskResponse); 
     } 
    } 
} 

public void sendTaskToAllEndpoints(Task task) { 
    long taskStartedAt = System.currentTimeMillis(); 
    HashMap<InetAddress, TaskResponse> taskResponses = new HashMap<>(); 
    taskResponseMap.put(task.taskUuid, taskResponses); 
    taskMap.put(task.taskUuid, task); 

    for (InetAddress dest : getDestinationNodes()) { 
     sendTaskTo(dest, task); 
     messageResponses.put(dest, TaskResponse.emptyTaskResponse()); 
    } 

    // Should wait for response to comeback till the timeout is over 
    while (System.currentTimeMillis() < taskStartedAt + timeoutInMillis) { 
     Thread.sleep(1000); 

     synchronized(taskResponseMap) { 
      synchronized (taskMap) { 
       if(isTaskOver(task.taskUuid)) { 
        Map<InetAddress, TaskResponse> responses = taskResponseMap.remove(task.taskUuid); 
        taskMap.remove(task.taskUuid); 

        task.taskIsDone(responses); 
        return; 
       } 
      } 
     } 
    } 

    // If the task is still sitting there, then it must have timed out! 
    synchronized(taskResponseMap) { 
     synchronized (taskMap) { 
      taskResponseMap.remove(task.taskUuid); 
      taskMap.remove(task.taskUuid); 
     } 
    } 
} 

// Do not synchronize purposefully since it is only being called in synchronized methods 
public boolean isTaskOver(String taskUuid) { 
    Task task = taskMap.get(taskUuid); 
    if (task == null || !taskResponseMap.containsKey(task.taskUuid)) { 
     return true; 
    } else { 
     for (TaskResponse value : taskResponseMap.get(task.taskUuid).values()) { 
      if (value.status != TaskResponseStatus.SUCCESSFUL) { 
       return false; 
      } 
     } 
    } 
    return true; 
} 

因此从概念上解释我的代码做什么,sendTaskToAllEndpoints()方法发送Task对象远程端点,并等待,直到while循环内超时。 只要远程端点响应,它就会执行endpointHasRespondedCallback()方法,以便它标记为在TaskResponseMap中完成。 早在sendTaskToAllEndPoints()方法,我检查的任务是通过使用isTaskDone() helper方法做,如果没有,我只是继续和调用Thread.sleep(1000)等待一秒钟,直到我下一次检查。

我面临的问题是,即使我看到endpointHasRespondedCallback()方法正在执行的所有我调度到(我用日志语句验证)的远程节点,它等待​​块外,只有在那里无论何时在sendTaskToAllEndpoints()方法中发生超时,因此即使所有节点都已正确响应,我的任务仍会超时。

这是我没有想到的,因为即使我在while循环中获取两个对象上的锁,我在睡觉前解锁它,并且我假设sendTaskToAllEndpoints()线程进入睡眠状态时,等待锁的其他线程应获取endpointHasRespondedCallback()方法中的锁,并将Task标记为按预期完成。

我大概可以实现我用更好的方式来解决这个问题的方案,但我想知道什么是对这种行为的合乎逻辑的解释。

+0

你的同步代码看起来很好,尽管你不需要'taskMap'上的同步,因为只有一个线程会改变它。(此外,如果你搞砸了一些东西,那么拥有依赖获取多个锁的同步逻辑是一种简单的方法,最终会导致死锁;你的目标应该是锁定对象的数量更少而不是更多,同时仍然保持线程安全。)你是否肯定没有其他代码(你没有发布)正在获取任何一个对象的锁? – Tim

+0

isTaskOver是公开的,但不是线程安全的!你应该改变为私人。顺便说一句。避免使用循环+ Thread.sleep进行主动等待。更好地使用条件等待(wait + notifyAll)。 – isnot2bad

+0

我为使其工作的唯一更改是通过删除while循环内的同步块。这应该意味着阻止我的另一个线程运行的只是while循环内的synchronized块。我知道在C中,编译器可能[改变正在执行的东西的顺序](http://stackoverflow.com/questions/4437527/why-do-we-use-volatile-keyword-in-c),以优化该程序影响了多线程程序的功能。在Java中也有类似的东西吗? – YShin

回答

0

好的,不要在意发生了什么,我在task.taskIsDone(responses)里面调用了sendTaskToAllEndpoints()的方法,在while循环中有效地保存了锁,直到新的被调用的任务也结束。

我通过创建一个AtomicInteger对象来增加每次我输入同步块时递增的对象,并在我离开时递减它(并将它们封装在try-finally块中)。第一项任务完成后,我观察到AtomicInteger对象没有降到0,并且它们在1和2之间上下移动。

看起来第一个任务成功结束,因为没有阻止回调执行,但一旦第一个任务已完成,并task.taskIsDone()叫,这种方法本身产生的Task另一个实例,并呼吁sendTaskToAllEndpoints(),因此,阻断所有回调调用的第二个任务,并依此类推,直到他们与超时结束。

谢谢大家的建议。