2010-04-21 99 views
4

我是多线程新手& Java编程的套接字编程。我想知道什么是实现2个线程的最佳方式 - 一个用于接收套接字,另一个用于发送套接字。如果我想要做的事听起来很荒谬,请让我知道为什么!代码主要来自Sun的在线教程。我想使用多播套接字,以便我可以使用多播组。Java:多线程和UDP套接字编程

class Server extends Thread 
{ 

    static protected MulticastSocket socket = null; 
    protected BufferedReader in = null; 
    public InetAddress group; 

    private static class Receive implements Runnable 
    { 

     public void run() 
     { 
      try 
      { 
       byte[] buf = new byte[256]; 
       DatagramPacket pkt = new DatagramPacket(buf,buf.length); 
       socket.receive(pkt); 
       String received = new String(pkt.getData(),0,pkt.getLength()); 
       System.out.println("From [email protected]" + received);   
       Thread.sleep(1000); 
      } 
      catch (IOException e) 
      { 
       System.out.println("Error:"+e); 
      } 
      catch (InterruptedException e) 
      { 
       System.out.println("Error:"+e); 
      } 

     } 

    } 


    public Server() throws IOException 
    { 
     super("server"); 
     socket = new MulticastSocket(4446); 
     group = InetAddress.getByName("239.231.12.3"); 
     socket.joinGroup(group); 
    } 

    public void run() 
    { 

     while(1>0) 
     { 
      try 
      { 
       byte[] buf = new byte[256]; 
       DatagramPacket pkt = new DatagramPacket(buf,buf.length);   
       //String msg = reader.readLine(); 
       String pid = ManagementFactory.getRuntimeMXBean().getName(); 
       buf = pid.getBytes(); 
       pkt = new DatagramPacket(buf,buf.length,group,4446); 
       socket.send(pkt); 
       Thread t = new Thread(new Receive()); 
       t.start(); 

       while(t.isAlive()) 
       { 
        t.join(1000); 
       } 
       sleep(1); 
      } 
      catch (IOException e) 
      { 
       System.out.println("Error:"+e); 
      } 
      catch (InterruptedException e) 
      { 
       System.out.println("Error:"+e); 
      } 

     } 
     //socket.close(); 
    } 

    public static void main(String[] args) throws IOException 
    { 
     new Server().start(); 
     //System.out.println("Hello"); 
    } 

} 
+0

你最终的目标是什么? – Xailor 2010-04-21 23:48:43

+0

@Ravi,我修正了你的格式,但你应该编辑类名......让它们以大写字母开头。当你的班级名称以小写字母开头时,阅读你的代码是很痛苦的。 – Kiril 2010-04-22 00:49:49

+0

@Xepoch:我的最终目标是在分布式系统中实现某些协议 @Lirik:对于类名称感到抱歉!我现在修好了。 – Ravi 2010-04-22 12:44:22

回答

2

婉婷在应用程序中创建线程是不是荒谬的!您不需要完全2个线程,但我认为您正在讨论实现Runnable接口的2个类。

自Java 1.5以来,线程API已经变得更好,而且您不需要再乱用java.lang.Thread。您可以简单地创建一个java.util.concurrent.Executor并向其提交Runnable实例。

本书Java Concurrency in Practice使用了这个确切的问题 - 创建一个线程套接字服务器 - 并且遍历代码的几次迭代以显示最佳方式。查看免费样本章节,这非常棒。我不会在这里复制/粘贴代码,但是请特别注意列表6.8。

+0

谢谢德鲁,我不知道你能做到这一点!我会马上看看concurrentExecutor – Ravi 2010-04-22 12:43:12

+0

__Careful !!! __虽然在产生的线程中执行阻塞操作总是完全可以的(然后它会阻塞一段时间),但这样做可能是致命的在传递给'java.util.concurrent.Executor'的'Runnable'实例中。为什么?因为Executor不保证在另一个线程上运行代码。它也可以在调用线程上运行代码。从文档:“但是,Executor接口不严格要求执行异步。”。所以你也可以用这种方式阻止你的主线程,并且你可以很容易地锁定整个程序。 – Mecki 2015-03-06 18:34:25

+0

好的呼吁实施重要。一个同步impl的例子。将[Spring's SyncTaskExecutor](http://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/core/task/SyncTaskExecutor.html) – Drew 2015-03-09 14:26:59

9

的第一件事是先:你的类应按照Java Naming Conventions大写字母开头:

类名应该是名词,采用大小写混合的 的第一个字母,每个单词大写。尝试 保持您的类名简单, 描述性。使用整个单词 - 避免使用 首字母缩写词和缩写(除非 的缩写比使用较长的格式(比如URL或 HTML)使用得更广泛的 )。

二: 尝试打破码成连贯的部分,并组织他们周围,你处理一些共同的特点......也许周围的功能或你的编程模型。

服务器的(基本)模型是它只有它接收套接字连接...服务器依赖于处理程序来处理这些连接,就是这样。如果试图建立一个模型,它会是这个样子:

class Server{ 
    private final ServerSocket serverSocket; 
    private final ExecutorService pool; 

    public Server(int port, int poolSize) throws IOException { 
     serverSocket = new ServerSocket(port); 
     pool = Executors.newFixedThreadPool(poolSize); 
    } 

    public void serve() { 
     try { 
     while(true) { 
      pool.execute(new Handler(serverSocket.accept())); 
     } 
     } catch (IOException ex) { 
     pool.shutdown(); 
     } 
    } 
    } 

    class Handler implements Runnable { 
    private final Socket socket; 
    Handler(Socket socket) { this.socket = socket; } 
    public void run() { 
     // receive the datagram packets 
    } 
} 

第三:我建议你看一些现有的例子。

每评论更新时间:
OK拉维,还有一些问题与您的代码和一些未成年人问题:

  1. 我认为Receive类是你的客户......你应该把它作为一个单独的程序(带有它自己的主类)并同时运行你的服务器和多个客户端。从你的服务器产生一个新的“客户端线程”,你发送的每一个新的UDP包都是一个令人不安的想法(big issue)。

  2. 当你让你的客户端应用程序,你应该让运行接收的代码在其自己的while环路(轻微问题),例如:

    public class Client extends Thread 
    { 
        public Client(/*..*/) 
        { 
         // initialize your client 
        } 
    
        public void run() 
        { 
         while(true) 
         { 
          // receive UDP packets 
          // process the UDP packets 
         } 
        } 
    
        public static void main(String[] args) throws IOException 
        { 
         // start your client 
         new Client().start(); 
        } 
    } 
    
  3. 你应该只需要每一个线程客户端和每个服务器一个线程(因为main有自己的线程,所以在技术上甚至没有单独的线程),所以你可能找不到有用的ExecutorService

否则,你的做法是正确的......但我仍然建议你看看一些例子。

+3

http://www.developer.com/ java/ent/article.php/3645111/Java-5s-BlockingQueue.htm - Doug Lea的'简单服务器' – 2010-04-22 03:01:28

+0

@John啊,是的......感谢John,那就是我一直在寻找的东西。 – Kiril 2010-04-22 03:19:47

+0

+1好答案!!!!! – 2010-04-22 03:29:01

0

2个线程很好。一位读者另一位作家。记住,使用UDP你不应该产生新的处理线程(除非你所做的需要很长时间),我建议把传入的消息放入处理队列中。对于发送来说也是一样的,有一个发送线程可以阻塞UDP传入队列。

1

这是一件很好的事情,即使是在一天之后,Eclipse的历史也能运行:)感谢这一点,我能够给这两个Ravi一个工作示例,并且Lirik就他的泄漏问题做出回答。

让我先说一说,我不知道是什么原因导致这种泄漏,但是如果我把它留下足够长的时间,它将会在OutOfMemoryError上失败。

其次,我将工作代码留给Ravi作为我的UDP服务器的工作基本示例。超时在那里测试我的防火墙将终止接收器的时间(30秒)。只要移除池中的任何东西,你就可以走了。

因此,这里是我的示例线程UDP服务器的工作,但泄漏版本。

public class TestServer { 

private static Integer TIMEOUT = 30; 
private final static int MAX_BUFFER_SIZE = 8192; 
private final static int MAX_LISTENER_THREADS = 5; 
private final static SimpleDateFormat DateFormat = new SimpleDateFormat("yyyy-dd-MM HH:mm:ss.SSSZ"); 

private int mPort; 
private DatagramSocket mSocket; 

// You can remove this for a working version 
private ExecutorService mPool; 

public TestServer(int port) { 
    mPort = port; 
    try { 
     mSocket = new DatagramSocket(mPort); 
     mSocket.setReceiveBufferSize(MAX_BUFFER_SIZE); 
     mSocket.setSendBufferSize(MAX_BUFFER_SIZE); 
     mSocket.setSoTimeout(0); 

     // You can uncomment this for a working version 
     //for (int i = 0; i < MAX_LISTENER_THREADS; i++) { 
     // new Thread(new Listener(mSocket)).start(); 
     //} 

     // You can remove this for a working version 
     mPool = Executors.newFixedThreadPool(MAX_LISTENER_THREADS); 

    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
} 

// You can remove this for a working version 
public void start() { 
    try { 
     try { 
      while (true) { 
       mPool.execute(new Listener(mSocket)); 
      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } finally { 
     mPool.shutdown(); 
    } 
} 

private class Listener implements Runnable { 

    private final DatagramSocket socket; 

    public Listener(DatagramSocket serverSocket) { 
     socket = serverSocket; 
    } 

    private String readLn(DatagramPacket packet) throws IOException { 
     socket.receive(packet); 
     return new BufferedReader(new InputStreamReader(new ByteArrayInputStream(packet.getData())), MAX_BUFFER_SIZE).readLine(); 
    } 

    private void writeLn(DatagramPacket packet, String string) throws IOException { 
     packet.setData(string.concat("\r\n").getBytes()); 
     socket.send(packet); 
    } 

    @Override 
    public void run() { 
     DatagramPacket packet = new DatagramPacket(new byte[MAX_BUFFER_SIZE], MAX_BUFFER_SIZE); 
     String s; 
     while (true) { 
      try { 
       packet = new DatagramPacket(new byte[MAX_BUFFER_SIZE], MAX_BUFFER_SIZE); 
       s = readLn(packet); 
       System.out.println(DateFormat.format(new Date()) + " Received: " + s); 
       Thread.sleep(TIMEOUT * 1000); 
       writeLn(packet, s); 
       System.out.println(DateFormat.format(new Date()) + " Sent: " + s); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
} 

public static void main(String[] args) { 
    if (args.length == 1) { 
     try { 
      TIMEOUT = Integer.parseInt(args[0]); 
     } catch (Exception e) { 
      TIMEOUT = 30; 
     } 
    } 
    System.out.println(DateFormat.format(new Date()) + " Timeout: " + TIMEOUT); 
    //new TestServer(4444); 
    new TestServer(4444).start(); 
} 
} 

btw。 @Lirik,我在Eclipse中首先目睹了这种行为,之后我从命令行对它进行了测试。再次,我不知道是什么导致它)对不起...