概述
Client
作为客户端启动,每个客户端单独开启Send
&Receive
线程负责接收和发送消息。
Server
作为服务器端启动,每连接上一个客户端,新建一个Channel
线程负责接收客户端的消息并广播。
Client
public class Client {
public static void main(String[] args) throws IOException {
Socket s = new Socket("127.0.0.1",7777);
System.out.println("用户 "+s.getLocalPort()+" 已连接...");
new Thread(new ClientSend(s)).start();
new Thread(new ClientReceive(s)).start();
}
}
ClientSend
public class ClientSend implements Runnable {
private final Socket client;
private final PrintStream sendStream;
private final BufferedReader systemIn;
private boolean errorOccurred = false;
public ClientSend(Socket client) throws IOException {
this.client = client;
sendStream = new PrintStream(client.getOutputStream());
systemIn = new BufferedReader(new InputStreamReader(System.in));
}
public void send() {
String msg = null;
try {
while ((msg = systemIn.readLine()) != null) {
sendStream.println(client.getLocalPort() + " 说 "+msg);
}
} catch (IOException e) {
e.printStackTrace();
CloseIOUtil.close(sendStream, systemIn);
errorOccurred = true;
}
}
@Override
public void run() {
while (!errorOccurred) {
send();
}
}
}
CliendReceive
public class ClientReceive implements Runnable {
private final Socket client;
private final BufferedReader receiveReader;
private boolean errorOccurred = false;
public ClientReceive(Socket client) throws IOException {
this.client = client;
this.receiveReader = new BufferedReader(new InputStreamReader(client.getInputStream()));
}
private void receive() {
String msg = null;
try {
if ((msg = receiveReader.readLine()) != null) {
System.out.println(msg);
}
} catch (IOException e) {
e.printStackTrace();
CloseIOUtil.close(receiveReader);
errorOccurred = true;
}
}
@Override
public void run() {
while (!errorOccurred) {
receive();
}
}
}
Server
public class Server {
public static final List<Socket> SOCKETS = Collections.synchronizedList(new ArrayList<>());
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(7777);
while (true) {
Socket s = server.accept();
System.out.println("用户 "+s.getPort() + " 已连接...");
SOCKETS.add(s);
new Thread(new ServerChannel(s)).start();
}
}
}
ServerChannel
public class ServerChannel implements Runnable {
private final Socket msgSocket;
private final BufferedReader receive;
private PrintStream send;
private boolean errorOccurred = false;
public ServerChannel(Socket msgSocket) throws IOException {
this.msgSocket = msgSocket;
this.receive = new BufferedReader(new InputStreamReader(msgSocket.getInputStream()));
this.send = new PrintStream(msgSocket.getOutputStream());
}
public String receive() {
try {
return receive.readLine();
} catch (IOException e) {
e.printStackTrace();
CloseIOUtil.close(receive);
Server.SOCKETS.remove(msgSocket);
errorOccurred = true;
}
return null;
}
private void send(String msg) {
if (msg != null) {
System.out.println("收到消息--> "+msg);
for (Socket s : Server.SOCKETS){
try {
send = new PrintStream(s.getOutputStream());
send.println(msg);
System.out.println("发出消息--> "+msg);
} catch (IOException e) {
e.printStackTrace();
CloseIOUtil.close(send,receive);
Server.SOCKETS.remove(s);
errorOccurred =true;
}
}
}
}
@Override
public void run() {
while (!errorOccurred){
send(receive());
}
}
}