2016-01-21 53 views
0

我正在构建PostgreSQL有线协议的简单实现,并希望客户端将消息发送到我的服务,然后在后台异步处理它们。我在使用ExecutorService与使用原始线程的时候有点困难。我正在使用两个BlockingQueue - 一个用于发送消息并将它们发送到服务器,另一个用于接收消息,目前我的代码位于下方。消息的异步处理 - 要使用哪个并发基元?

我想知道的是,在这里使用ExecutorService还是应该创建并启动ReceiveThreadSendThread作为独立线程(即new Thread(new ReceiveThread()).start();)?

import java.io.*; 
import java.net.Socket; 
import java.nio.ByteBuffer; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.*; 


public class Connection { 

    private MessageBuilder builder; 
    private MessageReader reader; 

    private BlockingQueue<byte[]> sendQueue; 
    private BlockingQueue<byte[]> receiveQueue; 

    private ExecutorService exec = Executors.newFixedThreadPool(2); 

    private Socket socket; 

    public Connection(String hostName, int port, String username) throws IOException { 
     this(new MessageBuilder(), new MessageReader(), hostName, port, username); 
    } 

    public Connection(MessageBuilder builder, MessageReader reader, String hostName, int port, String username) throws IOException { 
     this.builder = builder; 
     this.reader = reader; 
     this.sendQueue = new LinkedBlockingDeque<byte[]>(); 
     this.receiveQueue = new LinkedBlockingDeque<byte[]>(); 
     socket = new Socket(hostName, port); 
     List<Param> params = new ArrayList<Param>(); 
     params.add(new Param("user", username)); 
     sendQueue.add(builder.buildStartupMessage(3, 0, params)); 

     exec.submit(new SendThread(sendQueue, new DataOutputStream(socket.getOutputStream()))); 
     exec.submit(new ReceiveThread(receiveQueue, new DataInputStream(socket.getInputStream()), new MessageReader())); 
    } 

    public void sendMessage(byte[] bytes) { 
     sendQueue.add(bytes); 
    } 

    public void closeConnection() throws IOException { 
     socket.close(); 
    } 
} 

class ReceiveThread implements Callable<Boolean> { 
    private BlockingQueue<byte[]> receiveQueue; 
    private DataInputStream dis; 
    private MessageReader reader; 

    public ReceiveThread(BlockingQueue<byte[]> queue, DataInputStream dis, MessageReader reader) { 
     this.receiveQueue = queue; 
     this.dis = dis; 
     this.reader = reader; 
    } 

    public Boolean call() throws Exception { 
     byte msgByte = dis.readByte(); 
     System.out.println("Response type is: " + (char) msgByte); 
     int length = dis.readInt(); 
     byte[] message = new byte[length+1]; 
     message[0] = msgByte; 
     byte[] bytes = ByteBuffer.allocate(4).putInt(length).array(); 
     System.arraycopy(bytes, 0, message, 1, bytes.length); 
     int readLength = dis.read(message, 5, length - 5); 
     System.out.println("readLength : " + readLength + " should be length: " + (length-5)); 
     receiveQueue.put(message); 
     return true; 
    } 
} 

class SendThread implements Callable<Boolean> { 

    private BlockingQueue<byte[]> sendQueue; 
    private DataOutputStream dos; 

    public SendThread(BlockingQueue<byte[]> queue, DataOutputStream dos) { 
     this.sendQueue = queue; 
     this.dos = dos; 
    } 

    public Boolean call() throws Exception { 
     byte[] message = sendQueue.take(); 
     dos.write(message); 
     return true; 
    } 
} 

回答

0

使用ExecutorService的主要好处是它提供了线程池。如果你有很多短命线程,那么由这些线程的创建所引起的开销可能会很大。 ExecutorService可以通过使用线程池并为线程分配任务来解决这个问题。它还负责安排。比如说,你有15000个工作要做,每个需要5ms才能完成。创建15000个线程显然没有意义。 ExecutorService将安排您的工作,比如4个线程。根据您的需要,您可以使用不同类型的ExecutorService

我不完全理解你的代码,你为什么不在发送和接收线程中使用循环。您的代码只会收到/发送一条我认为不是您的意图的消息。您可以摆脱ExecutorService并拥有2个长期线程,一个用于发送,一个用于接收(与您目前的一样)。但是,在这些内部,您应该编写一个while循环来定期检查队列的内容。