JavaSE-网络编程
并发一高,会导致通讯变慢等情况。
解决这一问题,可以通过java nio编程,即非阻塞方式,即通道channel和选择器selector来搞定。
NetChartDemo_1
src
com.ztl.controler
SystemController();//系统启动器
com.ztl.iface.parser
IMessageParser();
com.ztl.iface.thread
IServerSocketThread();
com.ztl.impl.parser
MessageParserImpl();
com.ztl.impl.thread
ServerSocketThreadImpl(); //改名为SocketServerRunnableImpl();
ClientSocketRunnable(); //改名为ServerClinetRunnable();
UserClientSocketRunnable(); //copy ClientSocketRunnable();
DaemonThread(); //守护线程部分,负责打印统计信息,报告信息等
com.ztl.manager
MessageQueueManager(); //消息队列管理器
MessageParserManager(); //消息解析器
com.ztl.pojos
MessageQueuePojo(); //消息队列pojo类
com.ztl.utils
SystemParas(); //系统参数配置工具类
ReadConfigUtils(); //读取配置文件的工具类
StaticValue(); //静态变量定义工具类
test //测试 Source Folder
resources //配置文件夹 Source Folder
application.properties.
lib
application.properties:
#net chat config
node_master=true
# node_master=false
server_socket_port=9999
server_socket_wait_accept_max_pool=5
nick_name=天亮教育
server_socket_ip = 127.0.0.1
max_connection_client_number=10
pubilc class ReadConfigUtil{
private Properties config = null;
public ReadConfigUtil(String configFile){
InputStream in = ReadConfigUtil.class.getClassLoader()
.getResourceAsStream("application.properties");
config = new Properties();
try{
Reader reader = new InputStreamReader(in,StaticValue.default_encoding));
config.load(reader);
reader.close();
}catch(IOException e){
sout("none properties");
}
}
//根据key读取value
public String getValue(String key){
// Properties props = new Properties();
try{
String value = config.getProperty(key);
return value;
}catch(Exception e){
e.printStackTrace();
sout("ConfigInfoError" + e.toString());
return null;
}
}
psvm(){
sout();
}
}
public class SystemParas{
//初始化配置文件读取类
public static ReadConfigUtil configUtil = new ReadConfigUtil("application.properties");
//读取出各配置项,以备任何该项目中的类使用
public static boolean is_node_master = Boolean.parserBoolean(configUtil.getValue("node_master"));
public static String nick_name = configUtil.getValue("nick_name");
public static int max_connection_client_number = Integer.parserInt(configUtil.getValue("max_connection_client_number"));
public static int server_socket_port = Integer.parserInt(configUtil.getValue("server_socket_port"));
public static int server_socket_wait_accept_max_pool = Integer.parserInt(configUtil.getValue("server_socket_wait_accept_max_pool"));
public static String server_socket_ip = configUtil.getValue("server_socket_ip");
main(){
sout(configUtil.getValue("nick_name"));
sout(configUtil.getValue("node_master"));
sout(configUtil.getValue("max_connection_client_number"));
}
}
public class StaticValue{
public static String default_encoding = "utf-8";
public static String sepratar_next_line = "\n";
}
socket server:
public interface IServerSocketThread{
}
public class ServerSocketThreadImpl implements Runnable{
private String nickName;
private ServerSocket serverSocket;
private MessageQueueManager messageQueueManager;
//private ThreadGroup threadGroup; //线程组,用来统一管理client,socket的各个线程
private List<ServerClientSocketRunnable> serverClientList;
private boolean isRunning = true; //状态位
public ServerSocketThreadImpl(String nickName,ServerSocket serversocket){
this.nickName = nickName;
this.serverSocket = serverSocket;
messageQueueManager = new MessageQueueManager();
//开启管理员向user client端发送消息
AdminWriteMessageRunnable adminWriteMessageRunnable = new AdminWriteMessageRunnable(messageQueueManager);
new Thread(adminWriteMessageRunnable).start();
//this.threadGroup = new ThreadGroup("socket_client_group");
serverClientList = new LinkedList<ServerClientSocketRunnable>();
this.isRunning = true;
//在socket server启动守护线程
DaemonThread daemonThread = new DaemonThread(serverClientList,messageQueueManager);
new Thread(daemonThread).start();
}
setter and getter
//实现server socket的主要处理逻辑即可
@Override
public void run(){
while(isRunning){
Socket client_socket = null;
try{
client_socket = serverSocket.accept(); //下一步封装后再加进线程组
//即server client线程
ClientSocketRunnable clientSocketRunnable = new ClientSocketRunnable(
null,client_socket,this.messageQueueManager);
this.serverClientList.add(clientSocketRunnable);
new Thread(clientSocketRunnable).start();
sout("one client is online!");
}catch(){
xxx
}
}
}
// 管理员要写给各客户端的runnable类
class AdminWriteMessageRunnable implements Runnable{
private MessageQueueManager messageQueueManager;
private BufferedReader bufferReader;
private boolean isRunnable = true;
public AdminWriteMessageRunnable(MessageQueueManager messageQueueManager){
this.messageQueueManager = messageQueueManager;
this.isRunnable = true;
try{
this.bufferReader = new BufferedReader(new InputStreamReader(System.in.getInputStream(),
StaticValue.default_encoding));
}catch(){
xxx
}
}
@Override
public void run(){
String temp_line = null;
while(isRunnable){
try{
temp_line = this.bufferReader.readLine();
sout("admin by server to client---" + temp_line);
}catch(){
xxx
}
}
}
}
}
//server clinet runnable. 改名为ServerClientSocketRunnable
public class ClientSocketRunnable implements Runnable{
private Socket clientSocket;
private String nickName;
//这里如果想随意写或者随意读,就要独立出来
private BufferedWriter bufferWriter;
private BufferedReader bufferReader;
setter and getter
//封装的由server client向user client发送消息的方法
public void writerToUserClient(String message){
try{
this.bufferWriter.write(message);
this.bufferWriter.flush();
}catch(){
xxx
}
}
private MessageParserManager messageParserManager;
private boolean isRunnable = true;
private MessageQueueManager messageQueueManager;
public ClientSocketRunnable(String nickName, Socket clientSocket,MessageQueueManager messageQueueManager){
this.nickName = nickName;
this.clientSocket = clientSocket;
this.messageParserManager = new MessageParserManager();
this.messageQueueManager = messageQueueManager;
this.isRunnable = true;
try{
this.bufferReader = new BufferedReader(new InputStreamReader(this.clientSocket.getInputStream(),
StaticValue.default_encoding));
this.bufferWriter = new BufferedWriter((new OutputStreamWriter(this.clientSocket.getOutpuStream,
StaticValue.default_encoding));
}catch(){
xxx
}
}
@Override
public void run(){
String temp_line = null;
while(isRunnable){
try{
temp_line = this.bufferReader.readLine();
messageQueueManager.addOneMessage(temp_line);
sout("server from client message----" + temp_line);
}catch{
}
}
}
/*
class ServerClientWriteRunnable implements Runnable{
private BufferedWriter bufferWriter;
public ServerClientWriteRunnable(BufferedWriter bufferWriter){
}
}
*/
}
public class UserClientSocketRunnable implements Runnable{
private Socket clientSocket;
private String nickName;
//这里如果想随意写或者随意读,就要独立出来
private BufferedWriter bufferWriter;
private BufferedReader bufferReader;
private BufferedReader consoleBufferReader;
private MessageParserManager messageParserManager;
private boolean isRunnable = true;
private MessageQueueManager messageQueueManager;
public ClientSocketRunnable(String nickName, Socket clientSocket,MessageQueueManager messageQueueManager){
this.nickName = nickName;
this.clientSocket = clientSocket;
this.messageParserManager = new MessageParserManager();
this.isRunnable = true;
try{
this.consoleBufferReader = new BufferedReader(new InputStreamReader(System.in.getInputStream(),
StaticValue.default_encoding));
this.bufferReader = new BufferedReader(new InputStreamReader(this.clientSocket.getInputStream(),
StaticValue.default_encoding));
this.bufferWriter = new BufferedWriter((new OutputStreamWriter(this.clientSocket.getOutpuStream,
StaticValue.default_encoding));
//开启从服务端读取消息线程
ReadSocketServerRunnable readSocketServerRunnable=new ReadSocketServerRunnable(this.bufferReader);
new Thread(readSocketServerRunnable).start();
}catch(){
xxx
}
}
@Override
public void run(){
String temp_line = null;
while(isRunnable){
try{
temp_line = this.consoleBufferReader.readLine();
this.bufferWriter.write(temp_line + StaticValue.sepratar_next_line);
this.bufferWriter.flush();
sout("client to server message----" + temp_line);
}catch{
}
}
}
class ReadSocketServerRunnable implements Runnable{
private BufferedReader bufferReader;
private boolean isRunning = true;
public ReadSocketServerRunnable(BufferedReader bufferReader){
this.bufferReader = bufferReader();
isRunning = true;
}
@Override
public void run(){
String temp_line = null;
while(isRunning){
try{
temp_line = this.bufferReader.readLine();
sout("server to client---" + temp_line);
}catch(){
xxx
}
}
}
}
}
public class DaemonThread implements Runnable{
//持有所有客户端socket线程
//private ThreadGroup threadGroup;
private List<ServerClientSocketRunnable> serverClientList;
//待向所有客户端发送消息的消息队列管理器
private MessageQueueManager messageQueueManager;
private boolean isRunning = true;
pubilc DaemonThread(List<ServerClientSocketRunnable> serverClientList,MessageQueueManager messageQueueManager){
//this.threadGroup = threadGroup;
this.serverClientList = serverClientList;
this.messageQueueManager = messageQueueManager;
this.isRunning = true;
}
@Override
public void run(){
String message = null;
while(isRunning){
message = messageQueueManager.getOneMessage();
//这里可能会有异常,解决方法:加锁
for(ServerClientSocketRunnable serverClientSocketRunnable:serverClientList){
serverClientSocketRunnable.writeToUserClient(message+StaticValue.sepratar_next_line);
}
sout("daemon by server to client-- " + message);
}
}
}
public class MessageQueuePojo{
private LinkedList<String> messageList;
setter and getter
public MessageQueuePojo(){
this.messageList = new LinkedList<String>();
}
public void addMessage(String oneMeassage){
sychronized(this){
this.messageList.add(oneMessage);
this.notifyAll();//多个用户
}
}
public String popMessage(){
String message = null;
sychronized(this){
message = this.messageList.poll();
while(message == null){
try{
this.wait();
}catch(){
}
message = this.messageList.poll();
}
return message;
}
}
}
pubilc class MessageQueueManager{
private MessageQueuePojo messageQueue;
public MessageQueueManager(){
this.messageQueue = new MessageQueuePojo();
}
public String getOneMessage(){
return this.messageQueuePojo.popMessage();
}
public void addOneMessage(String message){
this.messageQueuePojo.addMessage(message);
}
}
public class SystemController{
psvm(){
if(SystemParas.is_node_master){//说明是服务节点
//port会被绑定,不需要bind
ServerSocket serverSocket = new ServerSocket(SystemParas.server_socket_port,SystemParas.server_socket_wait_accept_max_pool)
ServerSocketThraadImpl serverSocketRunnable = new ServerSocketThreadImpl(
SystemParas.nick_name,serverSocket);
Thread serverThread = new Thread(serverSocketRunnable);
serverThread.start();
sout("socket server have started");
}else{//说明是socket client节点
//这里测试的时候,要先开启server服务,再把node_master改为false。当client启动成功的时候,会反馈一条信息给server端
Socket socket = new Socket(SystemParas.server_socket_ip,SystemParas.server_socket_port);
ClientSocketRunnnable clientSocketRunnable = new ServerSocketThreadImpl(
SystemParas.nick_name,serverSocket);
Thread serverThread = new Thread(clientSocketRunnable);
serverThread.start();
sout("socket client have started");
}
}
}
public interface IMessageParser{
public String parser(String message);
}
public class MessageParserImpl implements IMessageParser{
@Override
public String parser(String message){
return null;
}
}
public class MessageParserManager{
private IMessageParser iMessageParser;
public MessageParserManager(){
this.iMessageParser = new MessageParserImpl();
}
pubilc String parser(String content){
return this.iMessageParser.parser(content);
}
}