首先发布这里,所以大家好。 我正在构建一个小型客户端,需要通过套接字发送心跳,并通过套接字接收各种固定长度的二进制消息。 我有一个连接管理器类,用于启动套接字连接并创建2个线程,每个线程每隔x秒发送一次心跳。另一个线程监听输入。Java ExecutorService线程写入相同的字段
当它接收到输入时,它将头部读入字节数组(总是4个字节),得到消息长度x = byte [3],然后读取下一个x字节到消息字节数组。然后它使用ExecutorService创建一个新的messasge处理器对象,它接收消息字节数组。
messasge处理器实现可运行并打印出消息字节。 但是,如果套接字接收到大量数据,即服务器按顺序发送消息,那么我的消息处理器似乎会打印出不同线程中的混合数据 - 我认为我的执行程序会创建一个新线程安全的对象,并具有自己的二进制实例[ ] msgBytes。
的ConnectionManager低于其产生发送/接收线程,RCV线程创建一个线程池,并且当读出消息创建新的MessageProcessor目的是处理字节[]消息
public class connectionManager extends Thread {
public connectionManager(InetAddress host, int serverSocket) {
System.out.println("in connectionManager create");
try {
clientSocket = new Socket(host, serverSocket);
outToServer = new DataOutputStream(clientSocket.getOutputStream());
connected = true;
final ExecutorService executor = Executors.newFixedThreadPool(10);
final Thread inThread = new Thread() {
@Override
public void run() {
while (connected) {
try {
DataInputStream dIn = new DataInputStream(clientSocket.getInputStream());
byte[] header = new byte[4];
dIn.readFully(header); // read the message
String msgType = new String(new byte[]{ header[3] }, "US-ASCII");
short length = getShortFromLittleEndianRange(header, 1); //test function to return length offset is start position of length
byte[] message = new byte[length - 1]; //minus the msg type byte
dIn.readFully(message);
executor.execute(new MessageProcessor(message, msgType));
} catch (Exception e) {
e.printStackTrace();
connected = false;
}
}
executor.shutdown();
System.out.println("Shutdown executor");
};
};
inThread.start();
final Thread outThread = new Thread() {
@Override
public void run() {
Heartbeat hb = new Heartbeat();
while(connected) {
PrintWriter out = null;
try {
this.sleep(3000);
outToServer.write(hb.serialize());
} catch (Exception e) {
e.printStackTrace();
System.out.println("cant send heartbeat server not alive...");
}
}
};
};
outThread.start();
} catch (Exception e) {
System.out.println("Cannot connect to server");
System.exit(0);
}
}
}
MessageProcess实现可运行我期望执行到创建新的对象MessageProcess各有自己的msgBytes的实例,并MSGTYPE
public class MessageProcessor implements Runnable {
private byte[] msgBytes;
private String msgType;
MessageProcessor(byte[] newMsgBytes, String newMsgType) {
msgBytes = newMsgBytes;
msgType = newMsgType;
}
@Override
public void run() {
output();
}
synchronized void output() {
System.out.println("\nMessageProcessor process inbound message " + Thread.currentThread().getId());
System.out.println("message type : " + " " + msgType);
for(byte b : msgBytes){
System.out.printf("%02X",b);
}
System.out.println("\nfinished MessageProcessor " + Thread.currentThread().getId());
}
}
然而,当我运行并接收来自服务器我康索尔输出看起来封邮件“洪水”,好像有一个线程安全ISSU e:下面的示例输出。
MessageProcessor process inbound message 16
message type : 8
01D40806004530515266385779624C6D653830
MessageProcessor process inbound message 17
2D36667664374E6D6A4100000000000000003030515266444E31417A766846000000000000000000000000020000000000A90DAE0400000001000000000000000000000000FA10000000000100000000000000004C43484C4742324500000052D7048024E3F70400BCBDE256306F0100000203003030515266444E31417A766800000000message type : 8
finished MessageProcessor 16
01D50806004530515266385779624C6D6638302D366676643176584A5200000000000000003030515266444E31417A766246000000000000000000000000010000000000A90DAE0400000001000000090000000109000000FA10000000000200000000000000004C43484C4742324500000041D7048024E3F70400BCBDE256306F0100000003003030515266444E31417A766200000000
finished MessageProcessor 17
我希望看到
MessageProcessor process inbound message 16
message type : 8
<hex>
finished MessageProcessor 16
MessageProcessor process inbound message 17
message type : 8
<hex>
finished MessageProcessor 17
我不是在我在做什么threas安全吗?
非常感谢 马特
感谢您的快速响应 - 所以我线程安全,它只是一个输出问题?即如果我要从消息字节创建一个消息对象,那么每个消息对象都有正确的数据。 –
'MessageProcessor'内的数据是安全的。假设你以安全的方式构造消息对象,它们将会很好。 – bradimus
非常感谢bradimus我改成了StringBuilder,输出看起来不错。 –