2016-03-11 22 views
0

首先发布这里,所以大家好。 我正在构建一个小型客户端,需要通过套接字发送心跳,并通过套接字接收各种固定长度的二进制消息。 我有一个连接管理器类,用于启动套接字连接并创建2个线程,每个线程每隔x秒发送一次心跳。另一个线程监听输入。Java ExecutorService线程写入相同的字段

当它接收到输入时,它将头部读入字节数组(总是4个字节),得到消息长度x = byte [3],然后读取下一个x字节到消息字节数组。然后它使用ExecutorService创建一个新的messasge处理器对象,它接收消息字节数组。

messasge处理器实现可运行并打印出消息字节。 但是,如果套接字接收到大量数据,即服务器按顺序发送消息,那么我的消息处理器似乎会打印出不同线程中的混合数据 - 我认为我的执行程序会创建一个新线程安全的对象,并具有自己的二进制实例[ ] msgBytes。

的ConnectionManager低于其产生发送/接收线程,RCV线程创建一个线程池,并且当读出消息创建新的MessageProcessor目的是处理字节[]消息

public class connectionManager extends Thread { 
    public connectionManager(InetAddress host, int serverSocket) { 
     System.out.println("in connectionManager create"); 
     try { 
      clientSocket = new Socket(host, serverSocket); 
      outToServer = new DataOutputStream(clientSocket.getOutputStream()); 
      connected = true; 
      final ExecutorService executor = Executors.newFixedThreadPool(10); 

      final Thread inThread = new Thread() { 
       @Override 
       public void run() { 
        while (connected) { 
         try { 
          DataInputStream dIn = new DataInputStream(clientSocket.getInputStream()); 

          byte[] header = new byte[4]; 
          dIn.readFully(header); // read the message        

          String msgType = new String(new byte[]{ header[3] }, "US-ASCII"); 

          short length = getShortFromLittleEndianRange(header, 1); //test function to return length offset is start position of length 
          byte[] message = new byte[length - 1]; //minus the msg type byte 

          dIn.readFully(message); 

          executor.execute(new MessageProcessor(message, msgType)); 
          } catch (Exception e) { 
           e.printStackTrace(); 
           connected = false; 
          } 
        } 
        executor.shutdown(); 
        System.out.println("Shutdown executor"); 
     }; 
      }; 
      inThread.start();   


      final Thread outThread = new Thread() { 
      @Override 
       public void run() { 
        Heartbeat hb = new Heartbeat(); 
        while(connected) { 
         PrintWriter out = null; 
         try { 
          this.sleep(3000);       
          outToServer.write(hb.serialize()); 
         } catch (Exception e) { 
          e.printStackTrace(); 
          System.out.println("cant send heartbeat server not alive..."); 
         } 
        } 
     }; 
      }; 
      outThread.start();    
     } catch (Exception e) { 
      System.out.println("Cannot connect to server"); 
      System.exit(0); 
     } 
    }  
} 

MessageProcess实现可运行我期望执行到创建新的对象MessageProcess各有自己的msgBytes的实例,并MSGTYPE

public class MessageProcessor implements Runnable { 

    private byte[] msgBytes; 
    private String msgType; 

    MessageProcessor(byte[] newMsgBytes, String newMsgType) { 
    msgBytes = newMsgBytes; 
    msgType = newMsgType; 
    }  

    @Override 
    public void run() { 
    output(); 
    } 

    synchronized void output() { 
     System.out.println("\nMessageProcessor process inbound message " + Thread.currentThread().getId()); 
     System.out.println("message type : " + " " + msgType); 
     for(byte b : msgBytes){ 
      System.out.printf("%02X",b); 
     }   
     System.out.println("\nfinished MessageProcessor " + Thread.currentThread().getId());   
    } 
} 

然而,当我运行并接收来自服务器我康索尔输出看起来封邮件“洪水”,好像有一个线程安全ISSU e:下面的示例输出。

MessageProcessor process inbound message 16 
message type : 8 

01D40806004530515266385779624C6D653830 
MessageProcessor process inbound message 17 
2D36667664374E6D6A4100000000000000003030515266444E31417A766846000000000000000000000000020000000000A90DAE0400000001000000000000000000000000FA10000000000100000000000000004C43484C4742324500000052D7048024E3F70400BCBDE256306F0100000203003030515266444E31417A766800000000message type : 8 


finished MessageProcessor 16 
01D50806004530515266385779624C6D6638302D366676643176584A5200000000000000003030515266444E31417A766246000000000000000000000000010000000000A90DAE0400000001000000090000000109000000FA10000000000200000000000000004C43484C4742324500000041D7048024E3F70400BCBDE256306F0100000003003030515266444E31417A766200000000 
finished MessageProcessor 17 

我希望看到

MessageProcessor process inbound message 16 
message type : 8 
<hex> 

finished MessageProcessor 16 

MessageProcessor process inbound message 17 
message type : 8 
<hex> 

finished MessageProcessor 17 

我不是在我在做什么threas安全吗?

非常感谢 马特

回答

0

由于您使用的MessageProcessor(每封邮件一个)不同的情况下,上output()的​​标签没有我想您所期待的效果。每个实例将能够访问自己的output()而不被其他实例阻挡。

您看到的输出是来自不同实例的输出的混合。为防止混音,您可以使用StringBuilder构建您的输出,并在一次调用中将其提交给System.out。如果您想跨实例同步输出,请考虑使用队列。

+0

感谢您的快速响应 - 所以我线程安全,它只是一个输出问题?即如果我要从消息字节创建一个消息对象,那么每个消息对象都有正确的数据。 –

+0

'MessageProcessor'内的数据是安全的。假设你以安全的方式构造消息对象,它们将会很好。 – bradimus

+0

非常感谢bradimus我改成了StringBuilder,输出看起来不错。 –

相关问题