我正在构建PostgreSQL有线协议的简单实现,并希望客户端将消息发送到我的服务,然后在后台异步处理它们。我在使用ExecutorService
与使用原始线程的时候有点困难。我正在使用两个BlockingQueue
- 一个用于发送消息并将它们发送到服务器,另一个用于接收消息,目前我的代码位于下方。消息的异步处理 - 要使用哪个并发基元?
我想知道的是,在这里使用ExecutorService还是应该创建并启动ReceiveThread
和SendThread
作为独立线程(即new Thread(new ReceiveThread()).start();
)?
import java.io.*;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class Connection {
private MessageBuilder builder;
private MessageReader reader;
private BlockingQueue<byte[]> sendQueue;
private BlockingQueue<byte[]> receiveQueue;
private ExecutorService exec = Executors.newFixedThreadPool(2);
private Socket socket;
public Connection(String hostName, int port, String username) throws IOException {
this(new MessageBuilder(), new MessageReader(), hostName, port, username);
}
public Connection(MessageBuilder builder, MessageReader reader, String hostName, int port, String username) throws IOException {
this.builder = builder;
this.reader = reader;
this.sendQueue = new LinkedBlockingDeque<byte[]>();
this.receiveQueue = new LinkedBlockingDeque<byte[]>();
socket = new Socket(hostName, port);
List<Param> params = new ArrayList<Param>();
params.add(new Param("user", username));
sendQueue.add(builder.buildStartupMessage(3, 0, params));
exec.submit(new SendThread(sendQueue, new DataOutputStream(socket.getOutputStream())));
exec.submit(new ReceiveThread(receiveQueue, new DataInputStream(socket.getInputStream()), new MessageReader()));
}
public void sendMessage(byte[] bytes) {
sendQueue.add(bytes);
}
public void closeConnection() throws IOException {
socket.close();
}
}
class ReceiveThread implements Callable<Boolean> {
private BlockingQueue<byte[]> receiveQueue;
private DataInputStream dis;
private MessageReader reader;
public ReceiveThread(BlockingQueue<byte[]> queue, DataInputStream dis, MessageReader reader) {
this.receiveQueue = queue;
this.dis = dis;
this.reader = reader;
}
public Boolean call() throws Exception {
byte msgByte = dis.readByte();
System.out.println("Response type is: " + (char) msgByte);
int length = dis.readInt();
byte[] message = new byte[length+1];
message[0] = msgByte;
byte[] bytes = ByteBuffer.allocate(4).putInt(length).array();
System.arraycopy(bytes, 0, message, 1, bytes.length);
int readLength = dis.read(message, 5, length - 5);
System.out.println("readLength : " + readLength + " should be length: " + (length-5));
receiveQueue.put(message);
return true;
}
}
class SendThread implements Callable<Boolean> {
private BlockingQueue<byte[]> sendQueue;
private DataOutputStream dos;
public SendThread(BlockingQueue<byte[]> queue, DataOutputStream dos) {
this.sendQueue = queue;
this.dos = dos;
}
public Boolean call() throws Exception {
byte[] message = sendQueue.take();
dos.write(message);
return true;
}
}