我有两个HashMap
s表示,我想在一个类中进行同步synchronized块。 我保持两个地图的原因是,我派遣任务到不同的服务器,和我保持在一个HashMap<String, Task>
原始任务对象,并在同一时间,我管理的另一个HashMap<String, HashMap<InetAddress, TaskResponse>
响应状态。 HashMaps
的密钥是String
,这对每个Task
对象都是唯一的。与Java while循环和Thread.sleep()方法
所以我的代码是这样的:
HashMap<String, Map<InetAddress, TaskResponse>> taskResponseMap = new HashMap<>();
HashMap<String, Task> taskMap = new HashMap<>();
public void endpointHasRespondedCallback(TaskResponse taskResponse, InetAddress from) {
// Callback threads gets blocked here!
synchronized(taskResponseMap) {
synchronized (taskMap) {
Map<InetAddress, TaskResponse> taskResponses = taskResponseMap.get(taskResponse.taskUuid);
if (taskResponses == null || !taskResponses.containsKey(from)) {
// The response does not exists probably due to timeout
return;
}
taskResponses.put(from, taskResponse);
}
}
}
public void sendTaskToAllEndpoints(Task task) {
long taskStartedAt = System.currentTimeMillis();
HashMap<InetAddress, TaskResponse> taskResponses = new HashMap<>();
taskResponseMap.put(task.taskUuid, taskResponses);
taskMap.put(task.taskUuid, task);
for (InetAddress dest : getDestinationNodes()) {
sendTaskTo(dest, task);
messageResponses.put(dest, TaskResponse.emptyTaskResponse());
}
// Should wait for response to comeback till the timeout is over
while (System.currentTimeMillis() < taskStartedAt + timeoutInMillis) {
Thread.sleep(1000);
synchronized(taskResponseMap) {
synchronized (taskMap) {
if(isTaskOver(task.taskUuid)) {
Map<InetAddress, TaskResponse> responses = taskResponseMap.remove(task.taskUuid);
taskMap.remove(task.taskUuid);
task.taskIsDone(responses);
return;
}
}
}
}
// If the task is still sitting there, then it must have timed out!
synchronized(taskResponseMap) {
synchronized (taskMap) {
taskResponseMap.remove(task.taskUuid);
taskMap.remove(task.taskUuid);
}
}
}
// Do not synchronize purposefully since it is only being called in synchronized methods
public boolean isTaskOver(String taskUuid) {
Task task = taskMap.get(taskUuid);
if (task == null || !taskResponseMap.containsKey(task.taskUuid)) {
return true;
} else {
for (TaskResponse value : taskResponseMap.get(task.taskUuid).values()) {
if (value.status != TaskResponseStatus.SUCCESSFUL) {
return false;
}
}
}
return true;
}
因此从概念上解释我的代码做什么,sendTaskToAllEndpoints()
方法发送Task
对象远程端点,并等待,直到while
循环内超时。 只要远程端点响应,它就会执行endpointHasRespondedCallback()
方法,以便它标记为在TaskResponseMap
中完成。 早在sendTaskToAllEndPoints()
方法,我检查的任务是通过使用isTaskDone()
helper方法做,如果没有,我只是继续和调用Thread.sleep(1000)
等待一秒钟,直到我下一次检查。
我面临的问题是,即使我看到endpointHasRespondedCallback()
方法正在执行的所有我调度到(我用日志语句验证)的远程节点,它等待块外,只有在那里无论何时在sendTaskToAllEndpoints()
方法中发生超时,因此即使所有节点都已正确响应,我的任务仍会超时。
这是我没有想到的,因为即使我在while
循环中获取两个对象上的锁,我在睡觉前解锁它,并且我假设sendTaskToAllEndpoints()
线程进入睡眠状态时,等待锁的其他线程应获取endpointHasRespondedCallback()
方法中的锁,并将Task
标记为按预期完成。
我大概可以实现我用更好的方式来解决这个问题的方案,但我想知道什么是对这种行为的合乎逻辑的解释。
你的同步代码看起来很好,尽管你不需要'taskMap'上的同步,因为只有一个线程会改变它。(此外,如果你搞砸了一些东西,那么拥有依赖获取多个锁的同步逻辑是一种简单的方法,最终会导致死锁;你的目标应该是锁定对象的数量更少而不是更多,同时仍然保持线程安全。)你是否肯定没有其他代码(你没有发布)正在获取任何一个对象的锁? – Tim
isTaskOver是公开的,但不是线程安全的!你应该改变为私人。顺便说一句。避免使用循环+ Thread.sleep进行主动等待。更好地使用条件等待(wait + notifyAll)。 – isnot2bad
我为使其工作的唯一更改是通过删除while循环内的同步块。这应该意味着阻止我的另一个线程运行的只是while循环内的synchronized块。我知道在C中,编译器可能[改变正在执行的东西的顺序](http://stackoverflow.com/questions/4437527/why-do-we-use-volatile-keyword-in-c),以优化该程序影响了多线程程序的功能。在Java中也有类似的东西吗? – YShin