我是一个试图学习网络编程和并发性的Java新手,我想我会尝试编写一个简单的聊天服务器,其中来自客户端的输入被回显给所有客户端。这没有发生。我添加了一些打印语句,以便程序会宣布它正在等待连接,并且每次都会收到连接。我在本地使用Telnet连接到我的机器上的端口。Java并发网络问题
该程序宣布第一个和第二个并发连接成功,但之后不会宣布后续连接成功,直到关闭所有连接。因此,例如,我将从五个独立的终端连接,并且该程序将宣布“连接1”和“连接2”,但将不会公布“连接3”,4和5,直到关闭所有终端。
我正在寻找帮助,找出我的错误在哪里以及如何处理这种情况的一般建议。
概括地说,我的程序有
- 主类,这将启动其他三个线程
- 一个ClientListener类,它采用了SocketReader侦听连接和存储插槽inputstreams和outputstreams在两套。
- 一个MessageReader,它遍历输入流。如果它找到一条消息,它将它放入一个SynchronousQueue中,并等待MessageWriter将其删除。 MessageWriter将消息发送到所有输出流。
代码如下。谢谢你的帮助!
public class Main {
public static void main(String[] args) {
ClientListener clientListener = new ClientListener();
Thread clientListenerThread = new Thread(clientListener);
clientListenerThread.setPriority(Thread.MAX_PRIORITY);
clientListenerThread.start();
MessageReader messageReader = new MessageReader(clientListener);
Thread messageReaderThread = new Thread(messageReader);
messageReaderThread.setPriority(Thread.MIN_PRIORITY);
messageReaderThread.start();
MessageWriter messageWriter = new MessageWriter(messageReader, clientListener);
Thread messageWriterThread = new Thread(messageWriter);
messageWriterThread.setPriority(Thread.NORM_PRIORITY);
messageWriterThread.start();
}
}
public class ClientListener implements Runnable {
private static final int DEFAULT_PORT = 5000;
private Set<Scanner> clientIn = Collections.synchronizedSet(
new LinkedHashSet<Scanner>());
private Set<PrintWriter> clientOut = Collections.synchronizedSet(
new LinkedHashSet<PrintWriter>());
public Set<Scanner> getClientIn() {
return clientIn;
}
public Set<PrintWriter> getClientOut() {
return clientOut;
}
@Override
public void run() {
try {
ServerSocket server = new ServerSocket(DEFAULT_PORT);
System.out.println("Listening for connections...");
int connectionNum = 0;
while(true) {
Socket socket = server.accept();
connectionNum++;
System.out.format("Connection %s%n", connectionNum);
Scanner in = new Scanner(socket.getInputStream());
PrintWriter out = new PrintWriter(socket.getOutputStream());
clientIn.add(in);
clientOut.add(out);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class MessageReader implements Runnable {
private ClientListener clientListener;
private BlockingQueue<String> messages = new SynchronousQueue<String>();
public MessageReader(ClientListener clientListener) {
this.clientListener = clientListener;
}
@Override
public void run() {
while(true) {
Set<Scanner> clients = clientListener.getClientIn();
synchronized (clients) {
for(Scanner client: clients) {
if(client.hasNext()) {
try {
messages.put(client.next());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
public String getMessage() throws InterruptedException {
return messages.take();
}
}
public class MessageWriter implements Runnable {
private ClientListener clientListener;
private MessageReader messageReader;
public MessageWriter(
MessageReader messageReader,
ClientListener clientListener) {
this.messageReader = messageReader;
this.clientListener = clientListener;
}
@Override
public void run() {
try {
while(true) {
String message = messageReader.getMessage();
Set<PrintWriter> clients = clientListener.getClientOut();
synchronized (clients) {
for(PrintWriter client: clients) {
client.println(message);
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
看起来您不止一次在端口5000上创建服务器。这可能是你的问题吗?你是否将多个进程绑定到该端口? –
我如何多次创建服务器? – bsamek
对不起,我以为你的新作是在真正的时候。 –