Socket编程及mina框架简单示例

要实现客户端与服务器的长连接,可以使用socket的方式连接服务器与客户端。在这篇文章中,将用原生的方式实现socket的服务器端和客户端,然后用Mina框架再实现一次。

原生方式上:

客户端可实现如下:

SocketClient:

package socketClient;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.io.OutputStreamWriter;

import java.net.Socket;

public class SocketClient {

public int port = 9898;

public String hostAddress = "127.0.0.1";

public static void main(String[] args) {

SocketClient client = new SocketClient();

client.start();

}

private void start() {

BufferedReader inputReader = null;

OutputStreamWriter output = null;

Socket socket = null;

try {

socket = new Socket(hostAddress, port);

inputReader = new BufferedReader(new InputStreamReader(System.in));

output = new OutputStreamWriter(socket.getOutputStream());

String inputContent;

int count = 0;

while (!(inputContent = inputReader.readLine()).equals("bye")) {

output.write(inputContent);

output.write("\n");

output.flush();

getServerMsg(socket);

}

} catch (IOException e) {

e.printStackTrace();

} finally {

try {

output.close();

inputReader.close();

socket.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

private void getServerMsg(Socket socket) {

new Thread(new Runnable() {

@Override

public void run() {

BufferedReader reader = null;

try {

reader = new BufferedReader(new InputStreamReader(

socket.getInputStream()));

String serverMsg;

while ((serverMsg = reader.readLine()) != null) {

System.out.println("server say: " + serverMsg);

}

} catch (IOException e) {

e.printStackTrace();

} finally {

try {

reader.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}).start();

}

}

服务器端:

package com.socket.tra;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.io.OutputStreamWriter;

import java.net.ServerSocket;

import java.net.Socket;

public class SocketServer {

public static void main(String[] args) {

SocketServer server = new SocketServer();

server.startServer();

}

private void startServer() {

ServerSocket serverSocket = null;

Socket socket = null;

try {

serverSocket = new ServerSocket(9898);

while (true) {

socket = serverSocket.accept();

System.out.println(socket.hashCode() + " is connect");

connect(socket);

}

} catch (IOException e) {

e.printStackTrace();

}

}

private void connect(final Socket socket) {

new Thread(new Runnable() {

public void run() {

BufferedReader reader = null;

OutputStreamWriter writer = null;

try {

reader = new BufferedReader(new InputStreamReader(

socket.getInputStream()));

writer = new OutputStreamWriter(socket.getOutputStream());

String msg;

while ((msg = reader.readLine()) != null) {

System.out.println(socket.hashCode()+"say: "+msg);

writer.write(msg + "\n");

writer.flush();

}

} catch (IOException e) {

e.printStackTrace();

} finally {

try {

writer.close();

reader.close();

socket.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}).start();

}

}

这是socket基本的用法,但在实际开发中,我们一般用封装好的框架来实现,Apache mina就是能够帮助用户开发高性能和高伸缩性网络应用程序的框架。它通过Java nio技术基于TCP/IP和UDP/IP协议提供了抽象的、事件驱动的、异步的API。

具体用法可以去官网了解下,这里提供一个简单的使用示例,实现跟上面原生方法同样的功能。

版本一:

服务器端:

package com.socket;

import java.io.IOException;

import java.net.InetSocketAddress;

import org.apache.mina.filter.codec.ProtocolCodecFilter;

import org.apache.mina.filter.codec.textline.TextLineCodecFactory;

import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

public class Main {

public static int port = 9898;

public static void main(String[] args) {

NioSocketAcceptor acceptor = new NioSocketAcceptor();

try {

//设置handler

acceptor.setHandler(new MyHandler());

//设置过滤器

acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));

//绑定端口号

acceptor.bind(new InetSocketAddress(port));

} catch (IOException e) {

e.printStackTrace();

}

}

}

MyHandler:

package com.socket;

import org.apache.mina.core.service.IoHandler;

import org.apache.mina.core.session.IdleStatus;

import org.apache.mina.core.session.IoSession;

public class MyHandler implements IoHandler {

public void exceptionCaught(IoSession arg0, Throwable arg1)

throws Exception {

System.out.println("exception");

}

public void inputClosed(IoSession arg0) throws Exception {

System.out.println("inputClosed");

}

public void messageReceived(IoSession arg0, Object arg1) throws Exception {

String msg = (String) arg1;

System.out.println("messageReceived server: " + msg);

arg0.write(msg);

}

public void messageSent(IoSession arg0, Object arg1) throws Exception {

System.out.println("messageSent");

}

public void sessionClosed(IoSession arg0) throws Exception {

System.out.println("sessionClosed "+arg0.hashCode());

}

public void sessionCreated(IoSession arg0) throws Exception {

System.out.println("sessionCreated "+arg0.hashCode());

}

public void sessionIdle(IoSession arg0, IdleStatus arg1) throws Exception {

System.out.println("sessionIdle "+arg0.hashCode()+" , "+arg1);

}

public void sessionOpened(IoSession arg0) throws Exception {

System.out.println("sessionOpened "+arg0.hashCode());

}

}

客户端:

package socketClient.mina;

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.io.OutputStreamWriter;

import java.net.InetSocketAddress;

import java.net.Socket;

import org.apache.mina.core.future.ConnectFuture;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.ProtocolCodecFilter;

import org.apache.mina.filter.codec.textline.TextLineCodecFactory;

import org.apache.mina.transport.socket.nio.NioSocketConnector;

public class SocketClient {

public int port = 9898;

public String hostAddress = "127.0.0.1";

public static void main(String[] args) throws IOException {

NioSocketConnector connector = new NioSocketConnector();

connector.setHandler(new MyClientHandler());

connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));

ConnectFuture future = connector.connect(new InetSocketAddress("127.0.0.1", 9898));

future.awaitUninterruptibly();//等待连接

IoSession session = future.getSession();

BufferedReader inputReader = new BufferedReader(new InputStreamReader(System.in));

String inputContent;

while (!(inputContent = inputReader.readLine()).equals("bye")) {

session.write(inputContent);

}

}

}

MyClientHandler:

package socketClient.mina;

import org.apache.mina.core.service.IoHandler;

import org.apache.mina.core.session.IdleStatus;

import org.apache.mina.core.session.IoSession;

public class MyClientHandler implements IoHandler {

public void exceptionCaught(IoSession arg0, Throwable arg1)

throws Exception {

System.out.println(arg1.getCause());

}

public void inputClosed(IoSession arg0) throws Exception {

// System.out.println("inputClosed");

}

public void messageReceived(IoSession arg0, Object arg1) throws Exception {

String msg = (String) arg1;

System.out.println("client messageReceived: " + msg);

}

public void messageSent(IoSession arg0, Object arg1) throws Exception {

System.out.println("client messageSent->" + (String)arg1);

}

public void sessionClosed(IoSession arg0) throws Exception {

System.out.println("sessionClosed "+arg0.hashCode());

}

public void sessionCreated(IoSession arg0) throws Exception {

System.out.println("sessionCreated "+arg0.hashCode());

}

public void sessionIdle(IoSession arg0, IdleStatus arg1) throws Exception {

System.out.println("sessionIdle "+arg0.hashCode()+" , "+arg1);

}

public void sessionOpened(IoSession arg0) throws Exception {

System.out.println("sessionOpened "+arg0.hashCode());

}

}

版本一使用框架写好的TextLineCodecFactory来解析字符串,在实际实用场合中,往往要自定义解析功能,因此版本二自己写一个字符串解析功能。

版本二:

服务器端:

Main: 主函数

MyDecoder: 实现数据的解码

MyEncoder: 实现数据的编码

MyHandler:

MyProtocolFactory: 生成编码和解码器

MyCumulativeEncoder: 实现数据的编码,可将服务器数据进行缓存,防止数据丢失

Main:

package com.socket.r1;

import java.io.IOException;

import java.net.InetSocketAddress;

import org.apache.mina.core.session.IdleStatus;

import org.apache.mina.filter.codec.ProtocolCodecFilter;

import org.apache.mina.filter.codec.textline.TextLineCodecFactory;

import org.apache.mina.filter.logging.LoggingFilter;

import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

public class Main {

public static int port = 9898;

public static void main(String[] args) {

NioSocketAcceptor acceptor = new NioSocketAcceptor();

try {

acceptor.setHandler(new MyHandler());

acceptor.getFilterChain().addLast("logger", new LoggingFilter());

acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocolCodecFactory()));

acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 125);

acceptor.bind(new InetSocketAddress(port));

} catch (IOException e) {

e.printStackTrace();

}

}

}

MyDecoder:

package com.socket.r1;

import org.apache.mina.core.buffer.IoBuffer;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.ProtocolDecoder;

import org.apache.mina.filter.codec.ProtocolDecoderOutput;

public class MyDecoder implements ProtocolDecoder {

public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput output)

throws Exception {

//记录字符流的读取位置

int startPosition = in.position();

while(in.hasRemaining()){

byte b = in.get();

if(b == '\n'){

int curPosition = in.position();

//记录字符流的末位置

int limit = in.limit();

//将读取指针设置为初始位置

in.position(startPosition);

//将结束位置设置为当前读取位置

in.limit(curPosition);

IoBuffer buf = in.slice();

byte[] bytes = new byte[buf.limit()];

//将截取的内容放进bytes数组

buf.get(bytes);

String str = new String(bytes);

output.write(str);

in.position(curPosition);

in.limit(limit);

}

}

}

public void dispose(IoSession arg0) throws Exception {

System.out.println("dispose" + arg0.hashCode());

}

public void finishDecode(IoSession arg0, ProtocolDecoderOutput arg1)

throws Exception {

System.out.println("finishDecode" + arg0.hashCode());

}

}

MyEncoder:

package com.socket.r1;

import java.nio.charset.Charset;

import java.nio.charset.CharsetEncoder;

import org.apache.mina.core.buffer.IoBuffer;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.ProtocolEncoder;

import org.apache.mina.filter.codec.ProtocolEncoderOutput;

public class MyEncoder implements ProtocolEncoder {

public void dispose(IoSession arg0) throws Exception {

System.out.println("dispose" + arg0.hashCode());

}

public void encode(IoSession arg0, Object msg, ProtocolEncoderOutput output)

throws Exception {

String s= null;

if(msg instanceof String){

s = (String) msg;

}

if(s!=null){

CharsetEncoder charsetEncoder = (CharsetEncoder) arg0.getAttribute("encoder");

if(charsetEncoder ==null){

charsetEncoder = Charset.defaultCharset().newEncoder();

arg0.setAttribute("encoder",charsetEncoder);

}

IoBuffer ioBuffer = IoBuffer.allocate(s.length());

ioBuffer.setAutoExpand(true);

ioBuffer.putString(s, charsetEncoder);

ioBuffer.flip();

output.write(ioBuffer);

}

}

}

MyHandler跟版本一的一样,这里就不贴代码了。

MyCumulativeDecoder :

package com.socket.r1;

import org.apache.mina.core.buffer.IoBuffer;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.CumulativeProtocolDecoder;

import org.apache.mina.filter.codec.ProtocolDecoderOutput;

public class MyCumulativeDecoder extends CumulativeProtocolDecoder {

/**

* 确认读取完成时return true

*/

@Override

protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput output)

throws Exception {

int startPosition = in.position();

while(in.hasRemaining()){

byte b = in.get();

if(b == '\n'){

int curPosition = in.position();

int limit = in.limit();

in.position(startPosition);

in.limit(curPosition);

IoBuffer buf = in.slice();

byte[] bytes = new byte[buf.limit()];

buf.get(bytes);

String str = new String(bytes);

output.write(str);

in.position(curPosition);

in.limit(limit);

return true;

}

}

//取消此次的读取,将读取位置重置

in.position(startPosition);

return false;

}

}

MyProtocolCodecFactory :

package com.socket.r1;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.ProtocolCodecFactory;

import org.apache.mina.filter.codec.ProtocolDecoder;

import org.apache.mina.filter.codec.ProtocolEncoder;

public class MyProtocolCodecFactory implements ProtocolCodecFactory {

// private MyDecoder decoder;

private MyCumulativeDecoder decoder;

private MyEncoder encoder;

public MyProtocolCodecFactory() {

// decoder = new MyDecoder();

decoder = new MyCumulativeDecoder();

encoder = new MyEncoder();

}

public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {

return decoder;

}

public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {

return encoder;

}

}

到此结束!!!!

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181210G00KZF00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券