2013-10-17 121 views
0

在我继承的应用程序中,我被告知必须将应用程序从一个端口一个连接应用程序转换为多个连接应用程序。我对套接字编程还很陌生,在解决这个问题时真的可以使用一些帮助。不幸的是,我的团队中没有其他人,我仍然试图通过这些代码来学习应用程序。处理多个套接字连接

这是我的代码问题。我有一个run()方法,它在同一个类中调用connect(),但必须能够在每次连接之后执行run()代码的其余部分,并验证activeSocket仍在activeSockets列表中。目前,connect()代码被设置为在run()中继续运行之前获取maxActiveSockets列表中的所有连接,但是这又是问题的一部分。在第一次连接完成后,我尝试脱离外环,然后我没有看到第二个连接进入应用程序。我真的可以用这第二套眼睛。

我希望这是有道理的。

任何帮助/方向将不胜感激。谢谢。

这里是run()的代码:

public void run() { 
int cnt; 
Socket socket; 
started = true; 

if (!shouldTerminate) { 
    LOGGER.info("Connecting to host " + Thread.currentThread().getName()); 
    connect(); 
    hardstandin = false; 
} 
startTimers(); 
while (!shouldTerminate) { 
    // Make sure connection stays up unless shouldTerminate is set 
    if (!shouldTerminate) { 

     // ***** 20130912 MS - Per Shannon's email this method had to be changed to utilize the activeSocket fields. ***** 
     Iterator<ActiveSocket> it = activeSockets.iterator(); 
     while (it.hasNext()) { 
      ActiveSocket activeSocket = it.next(); 
      socket = activeSocket.getSocket(); 
      DataInputStream dataInputStream = activeSocket.getDataInputStream(); 
      DataOutputStream dataOutputStream = activeSocket.getDataOutputStream(); 

      // ORIGINAL CODE BLOCK STARTS HERE ==> 
      if (socket != null) { 
       synchronized (socket) { 
        try { 
         //LOGGER.info("Sending notifyAll on socket"); 
         //socket.notifyAll(); 
         if (!shouldTerminate) { 
          if (!getNeedReconnect().get()) { 
           hardstandin = false; 
           socket.wait(); 
          } 
         } 
        } catch (final InterruptedException ex) { 
         LOGGER.info("Error waiting for new socket Interrupted Exception received."); 
        } 
       } 
      } 
      if (getNeedReconnect().get()) { 
       hardstandin = true; 
       final String channelName = "Bank " + bankInit + "; Bank ID " + getBankID(); 
       final String logMessage = channelName.concat("\n\nData input stream on socket connection could not be read. Attempting to re-establish the connection."); 
       final String subject = channelName.concat(" Socket connection error"); 
       ATMServer.sendNotification(subject, logMessage); 

       try { 
        socket.close(); 
       } catch (final IOException ex) { 
        LOGGER.info(ERR_SOCKCLOSE); 
       } 
       socket = null; 
       try { 
        dataInputStream.close(); 
        dataOutputStream.close(); 
       } catch (final IOException e) { 
        LOGGER.warn("Could not close input/output streams: " + FormatData.formatStack(e)); 
       } 

       dataInputStream = null; 
       dataOutputStream = null; 
       connect(); 

       if (((dataInputStream != null) && (dataOutputStream != null)) || shouldTerminate) { 
        getNeedReconnect().set(false); 
        hardstandin = false; 
        synchronized (this) { 
         LOGGER.info("Sending notifyAll on channel"); 
         this.notifyAll(); 
        } 
       } 
      } // <== ORIGINAL CODE BLOCK ENDS HERE. 
     } 
    } 
} 

// *** shutdown the process 
// need to shutdown the positiveBalance 
// need to shut down each receive queuer 
synchronized (this) { 
    LOGGER.info("Sending notifyAll on channel"); 
    this.notifyAll(); 
} 
// ***** 20130913 MS - Per Shannon's email, modify code to utilize the multiple connection code. ***** 
Iterator<ActiveSocket> it = activeSockets.iterator(); 
while (it.hasNext()) { 
    ActiveSocket activeSocket = it.next(); 
    socket = activeSocket.getSocket(); 

    // ORIGINAL CODE BLOCK STARTS HERE. ==> 
    if (socket != null) { 
     try { 
      socket.close(); 
     } catch (final IOException ex) { 
      LOGGER.info(ERR_SOCKCLOSE); 
     } 
    } 
    // <== ORIGINAL CODE BLOCK ENDS HERE. 
} 
stopTimers(); 
terminated = true; 
started = false; 
if (internalTerminatedSignal != null) { 
    try { 
     internalTerminatedSignal.await(COUNTDOWN_WAIT_SECONDS, TimeUnit.SECONDS); 
    } catch (final InterruptedException e) { 
     LOGGER.info(FormatData.fullStackTrace(e)); 
    } 
} 
if (terminatedSignal != null) { 
    LOGGER.info(this.getClass().getName() + " Updating countdown latch"); 
    terminatedSignal.countDown(); 
} 

}

这里是在connect()代码:

public void connect() { 

boolean clientSocket = false; 

LOGGER.info("Connecting ... "); 

while (!shouldTerminate) { 

    ActiveSocket activeSocket = null; 
    Socket newSocket = null; 
    int tries = 0; 
    int loopCounter = 0; 

    if (isServer) { 
     if (host == null) { 
      LOGGER.info("Must specify a host ip or host name with HOST configuration tag "); 
      return; 
     } else { 
      while (!shouldTerminate && (activeSockets.size() < this.maxActiveSockets)) { 
       activeSocket = new ActiveSocket(this); 
       newSocket = null;  
       if (serverSocket != null) {       
        try { 
         LOGGER.info("Accept socket."); 
         newSocket = serverSocket.accept(); 
        } catch (final IOException ex) { 
         if (shouldTerminate) { 
          if (ex.getMessage() != null) { 
           LOGGER.info(ex.getMessage()); 
          } 
         } else { 
          LOGGER.info("IOException while accepting connection."); 
          LOGGER.warn(FormatData.fullStackTrace(ex)); 
         } 
         newSocket = null; 
        } 
        if (newSocket != null) { 
         try { 
          LOGGER.info("Server socket keepalive ... "); 
          newSocket.setKeepAlive(true); 
          if (newSocket.getKeepAlive()) { 
           LOGGER.info("Server socket keep alive to host (" + host + "," + port + ") for BankID:" + Integer.toString(this.getBankID())); 
          }         
          activeSocket.setSocket(newSocket); 
          activeSockets.add(activeSocket); 
          increaseConnects(); 
          break; 
         } catch (final SocketException ex) { 
          LOGGER.info("SocketException while opening socket."); 
          LOGGER.warn(FormatData.fullStackTrace(ex)); 
          newSocket = null; 
         } 
        } 
       } else { // first time through the loop 
        try { 
         LOGGER.info("Opening server socket (" + host + "," + port + ") for BankID:" + Integer.toString(this.getBankID())); 
         serverSocket = new ServerSocket(port); 
        } catch (final IOException ex) { 
         LOGGER.info("Unable to open server socket socket (" + host + "," + port + ")"); 
         if (ex.getMessage().indexOf("Cannot assign requested address") > -1) { 
          this.terminate(); 
          final String logMessage = "Invalid IP Address assigned:" + host + ",port:" + port; 
          final String subject = logMessage; 
          ATMServer.sendNotification(subject, logMessage); 
         } else if (tries == 0) { 
          tries++; 
          final String logMessage = "Unable to open server socket (" + host + "," + port + ")"; 
          final String subject = "Unable to open server socket (" + host + "," + port + ")"; 
          ATMServer.sendNotification(subject, logMessage); 
         } 
         LOGGER.warn(FormatData.fullStackTrace(ex)); 
        } 
       } 
      } 
     } 
    } else { // client socket -- connecting to entity 

     clientSocket = true; 
     while (!shouldTerminate && (activeSockets.size() < this.maxActiveSockets)) { 
      activeSocket = new ActiveSocket(this); 
      newSocket = null; 
      if (this.isInForcedStandIn()) { 
       LOGGER.info("Forced standin " + getName()); 
       try { 
        Thread.sleep(3000); 
       } catch (final InterruptedException ex) { 
        LOGGER.info("Interrupted while waiting for connection."); 
        LOGGER.warn(FormatData.fullStackTrace(ex)); 
       } 
      } else { 
       if (!shouldTerminate) { 
        if ((loopCounter % 120) == 0) { 
         try { 
          LOGGER.info("Connecting to host (" + remoteHost + "," + remotePort + ") for BankID:" + Integer.toString(this.getBankID())); 
          newSocket = new Socket(InetAddress.getByName(remoteHost), remotePort); 
          if (newSocket.isConnected()) { 
           tries = 0; 
           LOGGER.info("Client socket connected to host (" + remoteHost + "," + remotePort + ") for BankID:" + Integer.toString(this.getBankID())); 
          } 
          newSocket.setKeepAlive(true); 
          if (newSocket.getKeepAlive()) { 
           LOGGER.info("Client socket keep alive to host (" + remoteHost + "," + remotePort + ") for BankID:" + Integer.toString(this.getBankID())); 
          } 
          activeSocket.setSocket(newSocket); 
          activeSockets.add(activeSocket); 
          increaseConnects(); 
          break; 
         } catch (final IOException ex) { 
          loopCounter++; 
          tries++; 
          LOGGER.info("SocketException while opening remote socket (" + remoteHost + "," + remotePort + ") " + " " + ex.getClass() + " " + ex.getMessage()); 

          if ((tries % 300) == 0) { 
           recordErrorToDatabase(ex.getMessage()); 
          } 
         } 
        } else { 
         loopCounter++; 
         try { 
          synchronized (clientConnectLock) { 
           clientConnectLock.wait(1000); 
          } 
         } catch (final InterruptedException inex) { 
          LOGGER.info("SocketException while opening remote socket " + Thread.currentThread().getName()); 
          LOGGER.warn(FormatData.fullStackTrace(inex)); 
          if (!this.shouldTerminate) { 
           recordErrorToDatabase("InterruptedException without terminate set."); 
          } 
         } 
        } 
       } 
      } 
     } 
    } 
    try { 
     // here, if we created a new ActiveSocket, establish dataInput and dataOuput streams 
     // for Discover, this will mean adding up to MaxActiveSockets # of sockets to the activeSockets list. 
     // for each other SwitchChannel, this will be the only activeSocket 
     if (activeSocket != null) { 
      LOGGER.info("Creating serverIn/serverOut data streams " + Thread.currentThread().getName()); 
      DataInputStream dataInputStream = new DataInputStream(new BufferedInputStream(newSocket.getInputStream())); 
      if (newSocket.isConnected()) { 
       LOGGER.info("socket still connected to host (" + remoteHost + "," + remotePort + ") for BankID:" + Integer.toString(this.getBankID())); 
      } 
      DataOutputStream dataOutputStream = new DataOutputStream(new BufferedOutputStream(newSocket.getOutputStream(), 2048)); 
      activeSocket.setDataInputStream(dataInputStream); 
      activeSocket.setDataOutputStream(dataOutputStream);     
      //activeSockets.add(activeSocket); 
      activeSocket.setNumReceivers(this.numReceivers); 
      ReceiveQueuer[] receiveQueuers = activeSocket.getReceiveQueuers(); 
      LOGGER.info("Starting receive queuers"); 
      for (int cnt = 0; cnt < numReceivers; cnt++) { 
       receiveQueuers[cnt].setName(this.systemName + "-Socket-" + Integer.toString(activeSockets.size()) + "-ReceiveQueuer-" + Integer.toString(cnt)); 
       receiveQueuers[cnt].setActiveSocket(activeSocket); 
       receiveQueuers[cnt].start(); 
      } 
     } 
     if (clientSocket) { 
      break; 
     } 
    } catch (final Exception ex) { 
     LOGGER.info("Exception while creating input/output streams " + Thread.currentThread().getName()); 
     LOGGER.warn(FormatData.fullStackTrace(ex)); 
    } 
} 
if (!shouldTerminate) { 
    LOGGER.info("Socket connection complete " + Thread.currentThread().getName()); 
} else { 
    LOGGER.info("Stopped establishing socket connection " + Thread.currentThread().getName()); 
} 

}

+0

进行多套接字的基本思想是让一个线程的唯一职责是接受套接字,然后生成“处理程序”线程来处理不同的连接(将新接受的套接字注入到它们中)。恐怕这里需要进行重大的重构。 – Fildor

回答

0

使用多个最简单的方法套接字是分配一个新的线程不要尝试在一个线程上使用不同的套接字,你会使自己很难