2013-11-20 38 views
0

我试图实现一个停止并使用在同一台机器上进行通信的发送方和接收方等待ARQ。我的问题是使两个线程同时运行,然后在两个线程之间进行通信(可能使用Thread.notify())。目前,当我在两个类中运行两个独立的主方法时,我的代码已经工作,没有实现流控制协议。但是,当我尝试从单独的主方法运行代码时,我只能首先运行Receiver线程或发送方线程,其中任何一个都会导致代码无限期地等待。我一般新的线程,所以任何帮助,非常感谢!一次运行两个线程,然后在它们之间进行通信

发件人类:

import java.net.DatagramSocket; 
import java.net.DatagramPacket; 
import java.net.InetSocketAddress; 

import java.io.File; 
import java.io.FileInputStream; 
import tcdIO.*; 

/** 
* 
* Sending side of a communication channel. 
* 
* The start method splits an image into a number of packets and sends them to a given receiver. 
* The main method acts as test for the class by filling the destination host and port number and the source port number. 
* 
*/ 
public class Sender implements Runnable{ 
    static final int DEFAULT_SRC_PORT = 50000; 
    static final int DEFAULT_DST_PORT = 50001; 
    static final String DEFAULT_DST_HOST = "localhost"; 

    static final String FILENAME = "input.jpg"; 

    static final int MTU = 1500; 

    static Terminal terminal; 

    DatagramSocket socket; 
    InetSocketAddress dstAddress; 

    /** 
    * Constructor 
    * 
    */ 
    Sender() { 
     this(DEFAULT_DST_HOST, DEFAULT_DST_PORT, DEFAULT_SRC_PORT); 
    } 


    /** 
    * Constructor 
    * 
    * Attempts to create socket at given port and create an InetSocketAddress for the destinations 
    */ 
    Sender(String dstHost, int dstPort, int srcPort) { 
     try { 
      dstAddress= new InetSocketAddress(dstHost, dstPort); 
      socket= new DatagramSocket(srcPort); 
     } 
     catch(java.lang.Exception e) { 
      e.printStackTrace(); 
     } 
    } 

    synchronized void sleep() { 
     try {this.wait(100);}catch(Exception e){e.printStackTrace();} 
    } 


    /** 
    * Sender Method 
    * 
    * Transmits a given image as a collection of packets; the first packet contains the size of the image as string. 
    */ 
    public void run() { 
     byte[] data= null; 
     DatagramPacket packet= null; 

     File file= null; 
     FileInputStream fin= null; 
     byte[] buffer= null; 
     int size; 
     int counter; 

     try { 
      file= new File(FILENAME);    // Reserve buffer for length of file and read file 
      buffer= new byte[(int) file.length()]; 
      fin= new FileInputStream(file); 
      size= fin.read(buffer); 
      if (size==-1) throw new Exception("Problem with File Access"); 
      terminal.println("File size: " + buffer.length + ", read: " + size); 

      data= (Integer.toString(size)).getBytes(); // 1st packet contains the length only 
      packet= new DatagramPacket(data, data.length, dstAddress); 
      terminal.println("Please press any key"); 
      terminal.readChar(); 
      socket.send(packet);    

      counter= 0; 
      do { 
       data= new byte[(counter+MTU<size) ? MTU : size-counter]; // The length of the packet is either MTU or a remainder 
       java.lang.System.arraycopy(buffer, counter, data, 0, data.length); 
       terminal.println("Counter: " + counter + " - Payload size: " + data.length); 

       packet= new DatagramPacket(data, data.length, dstAddress); 
       socket.send(packet); 
       this.sleep(); 
       counter+= data.length; 
      } while (counter<size); 

     terminal.println("Send complete"); 
    } 
    catch(java.lang.Exception e) { 
     e.printStackTrace(); 
    }  
} 



public static void main(String[] args) { 
    Sender s; 
    try {   
     String dstHost; 
     int dstPort; 
     int srcPort; 

     //dstHost= args[0]; 
     //dstPort= Integer.parseInt(args[1]); 
     //srcPort= Integer.parseInt(args[2]); 
     dstHost= DEFAULT_DST_HOST; 
     dstPort= DEFAULT_DST_PORT; 
     srcPort= DEFAULT_SRC_PORT; 

     terminal= new Terminal("Sender"); 

     s= new Sender(dstHost, dstPort, srcPort); 
     s.run(); 

     terminal.println("Program completed"); 
    } catch(java.lang.Exception e) { 
     e.printStackTrace(); 
    } 
} 


} 

接收机类:

import java.io.File; 
import java.io.FileOutputStream; 
import java.net.DatagramSocket; 
import java.net.DatagramPacket; 

import tcdIO.*; 

/** 
* Receiving side of a communication channel. 
* 
* The class provides the basic functionality to receive a datagram from a sender. 
* The main method acts as test for the class by filling the port number at which to receive the datagram. 
*/ 
public class Receiver implements Runnable{ 
    static final String FILENAME = "output.jpg"; 
    static final int DEFAULT_PORT = 50001; 
    static final int MTU = 1500; 
    static Terminal terminal; 

    DatagramSocket socket; 

    /** 
    * Constructor 
    * 
    */ 
    Receiver() { 
     this(DEFAULT_PORT); 
    } 


    /** 
    * Constructor 
    * 
    * Attempts to create socket at given port 
    */ 
    Receiver(int port) { 
     try { 
      socket= new DatagramSocket(port); 
     } 
     catch(java.lang.Exception e) { 
      e.printStackTrace(); 
     } 
    } 


    /** 
    * Receiver Method 
    * 
    * Attempts to receive a number of packets that contain an image; the first packet contains the size of the image 
    */ 
    public void run() { 
     byte[] data; 
     byte[] buffer; 
     DatagramPacket packet; 
     int counter; 
     int size; 

     File file; 
     FileOutputStream fout; 

     try { 
      data= new byte[MTU]; // receive first packet with size of image as payload 
      packet= new DatagramPacket(data, data.length); 
      terminal.println("Waiting for incoming packets"); 
      socket.receive(packet);   

      data= packet.getData(); // reserve buffer to receive image 
      size= (Integer.valueOf(new String(data, 0, packet.getLength()))).intValue(); 
      terminal.println("Filesize:" + size); 
      buffer= new byte[size]; 

      counter= 0;   
      while(counter<size) { // receive packet and store payload in array 
       data= new byte[MTU]; 
       packet= new DatagramPacket(data, data.length); 
       socket.receive(packet); 
       terminal.println("Received packet - Port: " + packet.getPort() + " - Counter: " + counter + " - Payload: "+packet.getLength()); 

       System.arraycopy(data, 0, buffer, counter, packet.getLength()); 
       counter+= packet.getLength(); 
      } 

      file= new File(FILENAME);    // Create file and write buffer into file 
      fout= new FileOutputStream(file); 
      fout.write(buffer, 0, buffer.length); 
      fout.flush(); 
      fout.close(); 
     } 
     catch(java.lang.Exception e) { 
      e.printStackTrace(); 
     }  
    } 


    /** 
    * Test method 
    * 
    * Creates an instance of the class Receiver 
    * 
    * @param args arg[0] Port number to receive information on 
    */
    public static void main(String[] args) { 
     Receiver r; 

     try { 
      terminal= new Terminal("Receiver"); 
      int port; 

      //port= Integer.parseInt(args[0]); 
      port= DEFAULT_PORT; 
      r= new Receiver(port); 
      r.run(); 

      terminal.println("Program completed"); 
     } catch(java.lang.Exception e) { 
      e.printStackTrace(); 
     } 
    } 
    */ 
} 

和主,这只是实例都和运行它们:

import tcdIO.Terminal; 


public class FlowControlMain { 

    /** 
    * 
    * 
    */ 
    public static void main(String[] args) { 

     Sender s; 
     Receiver r; 
     try{ 
      String dstHost= "localhost"; 
      int dstPort= 50001; 
      int srcPort= 50000; 

      Sender.terminal= new Terminal("Sender"); 
      Receiver.terminal = new Terminal("Receiver"); 

      s= new Sender(dstHost, dstPort, srcPort); 
      r = new Receiver(dstPort); 
      s.run(); 
      r.run(); 

     }catch(Exception e){ 
      e.printStackTrace(); 

     } 

    } 


} 

道歉代码的巨量,只是想给出一个完整的图片

回答

2

你没有使用线程,你在主线程中执行run()方法。

在自己Thread开始Runnable正确的方法是要么

Thread t = new Thread(myRunnable); 
t.start(); 

或使用一个ExecutorService,这是一个高一点的水平,并允许的东西,如线程池。

+0

非常感谢!他们之间的沟通方面会通知()工作吗? –

+0

取决于你的意思是沟通。等待/通知机制允许您控制线程,使一个(或多个线程)在继续之前等待另一个完成某个任务。 – Kayaman

+0

这正是我需要发生的事情,我需要发件人线程等待接收方在发送下一个之前确认收到数据包 –

相关问题