2016-03-09 30 views
1

我有用于启动的螺纹服务器下面的代码:我可以使用什么样的Java线程池来处理来自套接字的数据?

Thread server = new Thread(new ServerRunnable(serverPort, devMode, messageQueue, database)); 
server.start(); 

Thread worker1 = new Thread(
    new WorkerRunnable(
     messageQueue, 
     database, 
     devMode, 
     1 
    )); 
Thread worker2 = new Thread(
    new WorkerRunnable(
     messageQueue, 
     database, 
     devMode, 
     2 
    )); 
Thread worker3 = new Thread(
    new WorkerRunnable(
     messageQueue, 
     database, 
     devMode, 
     3 
    )); 

worker1.start(); 
worker2.start(); 
worker3.start(); 

ServerRunnable传递一个包含字节读取和其它信息到messageQueue一个状态对象。 WorkerRunnable线程获取消息并处理它们。

我在看着ThreadPoolExecutor,希望我可以用它来替换上面的三个工作线程,可以根据需要增加或减少一个池,但它不能按我的预期工作。它想要完成一个任务队列。

我的代码使用Java NIO,因此不能保证放入队列的项目已完成,因此可能需要进一步处理。因此我不能以我想象中的方式使用ThreadPoolExecutor,这将通过Runnable来包含排队。

所以,这让我感到,如果我想使用ThreadPoolExecutor这里我必须将它添加在ServerRunnable类(种反转我的当前进程),并传递一个消息参数后,通过一项新的WorkerRunnable类到ThreadPoolExecutor队列到它。 这是正确的吗?

事情是这样的,也许:

LinkedBlockingQueue messageQueue = new LinkedBlockingQueue<Runnable>(); 

Thread server = new Thread(
    new ServerRunnable(
     serverPort, 
     devMode, 
     messageQueue, 
     database 
    )); 
server.start(); 
int corePoolSize = 5; 
int maxPoolSize = 10; 
long keepAliveTime = 5000; 

ExecutorService threadPoolExecutor = 
    new ThreadPoolExecutor(
     corePoolSize, 
     maxPoolSize, 
     keepAliveTime, 
     TimeUnit.MILLISECONDS, 
     messageQueue); 
threadPoolExecutor.prestartAllCoreThreads() 

然后在ServerRunnable类:

// Process incoming bytes into a message 

messageQueue.put(new WorkerRunnable(database, devMode, message)); 

是我的理解是否正确?

+1

您可以通过execute(Runnable)将Runnable实例添加到ExecutorService。为了创建ExecutorService,我建议你给Executors一下看,因为它有几种创建各种设置的方法。 – gfelisberto

+0

是不是有没有使用类似Netty IO的库的原因? –

+0

是的,我们最初认为这条路线,但我们的设备不支持http,而当我在研究netty时,我发现自己一直都在指向NIO。所以我直接在NIO中实现。 –

回答

1

是的,无论是或者你可以让你的队列中有单个消费者,它将继续将任务传递给ExecutorService。像这样的东西

Thread consumer = new Thread(new Consumer(queue)); 
consumer.start(); 

class Consumer implements Runnable { 
    private ExecutorService  service = Executors.newCachedThreadPool(); 
    private final BlockingQueue queue; 

    public Consumer(Queue queue) { 
     this.queue = queue; 
    } 

    @Override 
    public void run() { 
     Task t = null; 
     while(t = queue.take()) { 
      Worker worker = new Worker(t); 
      service.execute(worker); 
     } 
    } 
} 

这与ExecutorService有点松散耦合。

+0

好的,谢谢!这看起来不错。我会给它一个! –

相关问题