2012-01-18 158 views
7

我想要并行执行一些不同的任务,但有一个概念,如果一个任务已经排队或正在处理,它不会被重新排队。我已经阅读了一些关于Java API的文章,并且已经提出了下面的代码,这似乎很有用。 任何人都可以阐明我使用的方法是否是最好的方法。任何危险(线程安全?)或更好的方法来做到这一点? 守则如下:线程池处理'重复'任务

import java.util.HashMap; 
import java.util.concurrent.Future; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

public class TestExecution implements Runnable { 
    String key1; 
    String key2; 
    static HashMap<TestExecution, Future<?>> executions = new HashMap<TestExecution, Future<?>>(); 
    static LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(); 
    static ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, q); 

    public static void main(String[] args) { 
     try { 
     execute(new TestExecution("A", "A")); 
     execute(new TestExecution("A", "A")); 
     execute(new TestExecution("B", "B")); 
     Thread.sleep(8000); 
     execute(new TestExecution("B", "B")); 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
    } 

    static boolean execute(TestExecution e) { 
     System.out.println("Handling "+e.key1+":"+e.key2); 
     if (executions.containsKey(e)) { 
     Future<?> f = (Future<?>) executions.get(e); 
     if (f.isDone()) { 
      System.out.println("Previous execution has completed"); 
      executions.remove(e); 
     } else { 
      System.out.println("Previous execution still running"); 
      return false; 
     }   
     } 
     else { 
     System.out.println("No previous execution"); 
     } 
     Future<?> f = tpe.submit(e); 
     executions.put(e, f);    
     return true; 
    } 

    public TestExecution(String key1, String key2) { 
     this.key1 = key1; 
     this.key2 = key2;  
    } 

    public boolean equals(Object obj) 
    { 
     if (obj instanceof TestExecution) 
     { 
      TestExecution t = (TestExecution) obj; 
      return (key1.equals(t.key1) && key2.equals(t.key2));   
     }  
     return false; 
    } 

    public int hashCode() 
    { 
     return key1.hashCode()+key2.hashCode(); 
    } 

    public void run() {  
     try { 
     System.out.println("Start processing "+key1+":"+key2); 
     Thread.sleep(4000); 
     System.out.println("Finish processing "+key1+":"+key2); 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     }  
    }    
} 

跟进以下评论:
的计划是触发该任务将执行由cron调用RESTful Web服务来处理。例如,下面是每天9点30分触发的一项任务的设置,另外每两分钟设置一项任务。

0/2 * * * * restclient.pl key11 key12 
30 09 * * * restclient.pl key21 key22 

在这种情况下,如果任务key11:KEY12运行,或者已经排队跑,我不想排队的另一个实例。我知道我们有其他选择,但我们倾向于使用cron来完成其他任务,所以我想尽量保持这一点。

第二次更新。针对迄今为止的意见,我重新编写了代码,您能否对以下更新解决方案的任何问题发表评论?

import java.util.concurrent.LinkedBlockingQueue; 

public class TestExecution implements Runnable { 
    String key1; 
    String key2;  
    static TestThreadPoolExecutor tpe = new TestThreadPoolExecutor(new LinkedBlockingQueue<Runnable>()); 

    public static void main(String[] args) { 
     try { 
     tpe.execute(new TestExecution("A", "A")); 
     tpe.execute(new TestExecution("A", "A")); 
     tpe.execute(new TestExecution("B", "B")); 
     Thread.sleep(8000); 
     tpe.execute(new TestExecution("B", "B")); 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
    } 

    public TestExecution(String key1, String key2) { 
     this.key1 = key1; 
     this.key2 = key2;  
    } 

    public boolean equals(Object obj) 
    { 
     if (obj instanceof TestExecution) 
     { 
      TestExecution t = (TestExecution) obj; 
      return (key1.equals(t.key1) && key2.equals(t.key2));   
     }  
     return false; 
    } 

    public int hashCode() 
    { 
     return key1.hashCode()+key2.hashCode(); 
    } 

    public void run() {  
     try { 
     System.out.println("Start processing "+key1+":"+key2); 
     Thread.sleep(4000); 
     System.out.println("Finish processing "+key1+":"+key2); 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     }  
    } 
} 


import java.util.Collections; 
import java.util.HashSet; 
import java.util.Set; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 


public class TestThreadPoolExecutor extends ThreadPoolExecutor { 
    Set<Runnable> executions = Collections.synchronizedSet(new HashSet<Runnable>()); 

    public TestThreadPoolExecutor(LinkedBlockingQueue<Runnable> q) {  
     super(2, 5, 1, TimeUnit.MINUTES, q);  
    } 

    public void execute(Runnable command) { 
     if (executions.contains(command)) { 
     System.out.println("Previous execution still running"); 
     return; 
     } 
     else { 
     System.out.println("No previous execution"); 
     } 
     super.execute(command);  
     executions.add(command);  
    } 

    protected void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t);   
     executions.remove(r); 
    }  
} 
+0

为什么不使用TestExecution代替HashMap中的HashSet的? – 2016-02-21 22:25:29

回答

2

一对夫妇的意见:

    在执行法
  • ,你会得到“处决”(的containsKey)的读取和写入(删除或认沽)如果之间的竞争条件多个线程同时调用此方法。你需要把所有的调用都包含在一个被认为是同步块中原子的“执行”中。 (在你的情况,使得该方法的同步将工作)http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
  • 你应该使用一个单身,而不是处理状态的静态(即全局)变量

但我真的想知道更多一点关于你的设计了解你想达到的目标。为什么一个任务会被排队执行几次?

+0

谢谢,我用更多的信息更新了这个问题。 – Patrick 2012-01-18 06:59:51

+1

而对于更面向对象的设计,我会考虑对ThreadPoolExecutor进行子类化,并将用于管理执行映射的代码放入execute()和afterExecute()函数中。 (在我看来,调用execute()而不是submit()方法会更加正确,但规范在这一点上并不明确) – 2012-01-18 07:28:14

+0

干杯,根据您的建议重写了代码。这看起来好吗? – Patrick 2012-01-18 09:10:20

3

这是我将如何处理和避免重复

import java.util.Collections; 
import java.util.Set; 
import java.util.concurrent.*; 

public class TestExecution implements Callable<Void> { 
    private static final ThreadPoolExecutor TPE = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()); 
    private static final Set<TestExecution> TE_SET = Collections.newSetFromMap(new ConcurrentHashMap<TestExecution, Boolean>()); 

    private final String key1; 
    private final String key2; 

    public static void main(String... args) throws InterruptedException { 
     new TestExecution("A", "A").execute(); 
     new TestExecution("A", "A").execute(); 
     new TestExecution("B", "B").execute(); 
     Thread.sleep(8000); 
     new TestExecution("A", "A").execute(); 
     new TestExecution("B", "B").execute(); 
     new TestExecution("B", "B").execute(); 
     TPE.shutdown(); 
    } 

    public TestExecution(String key1, String key2) { 
     this.key1 = key1; 
     this.key2 = key2; 
    } 

    void execute() { 
     if (TE_SET.add(this)) { 
      System.out.println("Handling " + this); 
      TPE.submit(this); 
     } else { 
      System.out.println("... ignoring duplicate " + this); 
     } 
    } 

    public boolean equals(Object obj) { 
     return obj instanceof TestExecution && 
       key1.equals(((TestExecution) obj).key1) && 
       key2.equals(((TestExecution) obj).key2); 
    } 

    public int hashCode() { 
     return key1.hashCode() * 31 + key2.hashCode(); 
    } 

    @Override 
    public Void call() throws InterruptedException { 
     if (!TE_SET.remove(this)) { 
      System.out.println("... dropping duplicate " + this); 
      return null; 
     } 
     System.out.println("Start processing " + this); 
     Thread.sleep(4000); 
     System.out.println("Finish processing " + this); 
     return null; 
    } 

    public String toString() { 
     return key1 + ':' + key2; 
    } 
} 

打印

Handling A:A 
... ignoring duplicate A:A 
Handling B:B 
Start processing A:A 
Start processing B:B 
Finish processing A:A 
Finish processing B:B 
Handling A:A 
Handling B:B 
Start processing A:A 
Start processing B:B 
... ignoring duplicate B:B 
Finish processing B:B 
Finish processing A:A 
+0

好的,谢谢,有一些很好的指针,尤其是通过使用ConcurrentHashMap并重写toString方法避免多线程问题。几个问题。为什么不使用HashSet(是因为没有等价的线程安全对象来使用?)我也不理解从HashMap中移除的代码。您似乎在处理开始时这样做,不应该在处理结束时进行处理吗? – Patrick 2012-01-18 09:06:14

+1

您可以使用'Collections.synchronizedSet(new HashSet())'这是线程安全的,但不是并发的。无论是在开始还是结束时都取决于您的要求。偶尔做两次或偶尔不做什么更好(因为新任务是在任务结束和被删除之间添加的) – 2012-01-18 09:10:02

+0

好吧,我真的不知道'线程安全'和'并发'之间的区别,也许我在那里做一些调查。但是为了移除元素,如果我不想重新执行(或排队)已经开始的作业,那么应该将TE_SET.remove移动到Call函数的末尾,对吧?我是否正确地认为“丢弃重复”情况是一个错误情况?如果我们在调用函数中,那么一个元素应该总是被写入HashSet的权利? – Patrick 2012-01-18 09:18:48