前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Mina入门实例

Mina入门实例

作者头像
全栈程序员站长
发布2022-07-10 10:22:28
4860
发布2022-07-10 10:22:28
举报

大家好,又见面了,我是全栈君。

继续上一篇,这篇主要讲通过mina往B端发送消息。并接受消息,mina是一个网络通信框架,封装了javaNIO。简单易用。网上有非常多关于他的介绍,在此不赘述了。

如上篇所介绍,完毕功能,须要五个类:

PoolListener:监听,用来在系统启动的时候创建连接。

SessionPool:连接池。

SendHandler:处理类。

CharsetEncoder:编码;

CharsetDecoder:解码:

B为我们提供了6个port。每一个port可建立3个长连接。因此。在系统时,就要创建长连接,以下是一个监听类:

代码语言:javascript
复制
import javax.servlet.ServletContextEvent;import javax.servlet.ServletContextListener;/** * 初始化连接 * @author yuanfubiao * */public class PoolListener implements ServletContextListener {	@Override	public void contextDestroyed(ServletContextEvent sce) {			}	@Override	public void contextInitialized(ServletContextEvent sce) {		String nds_ip = sce.getServletContext().getInitParameter("nds_ip");		String nds_ports = sce.getServletContext().getInitParameter("nds_ports");		SessionPool pool = new SessionPool();		try {						pool.init(nds_ip, nds_ports);		} catch (Exception e) {			e.printStackTrace();		}	}}

以下是监听配置,是配置在web.xml中:

代码语言:javascript
复制
    <display-name>Apache-Axis2</display-name>    <context-param>    	<param-name>nds_ip</param-name>    	<param-value>XX.XXX.XXX.XXX</param-value>    </context-param>    <context-param>    	<param-name>nds_ports</param-name>    	<param-value>12210,12211,12212,12213,12214,12215</param-value>    </context-param>    <listener>    	<listener-class>cn.net.easyway.nds.PoolListener</listener-class>    </listener>

以下是自己维护的一个连接池,相同使用并发包中的ConcurrentHashMap实现,他也是线程安全的,代码例如以下:

代码语言:javascript
复制
import java.net.InetSocketAddress;import java.util.HashMap;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.mina.core.future.ConnectFuture;import org.apache.mina.core.service.IoConnector;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.transport.socket.nio.NioSocketConnector;public class SessionPool {		private static Log logger = LogFactory.getLog(SessionPool.class);	private static int connNum = 0;	private static String ip = null;	private static Map<String,Integer> connNumPorts = new HashMap<String, Integer>();	private static ConcurrentHashMap<String, IoSession> pool = new ConcurrentHashMap<String, IoSession>();		/**	 * 初始化:读取配置文件。创建长连接	 * @throws Exception 	 */	public void init(String nds_ip,String nds_ports) throws Exception{		String[] ports = nds_ports.split(",");		ip = nds_ip;				for(int i=0;i<ports.length;i++){						int port = Integer.parseInt(ports[i]);			ConnectFuture future = null;						for(int j=0;j<3;j++){				String connNum = this.getConnNums();				logger.info("创建连接号---->>>>>" + connNum);				connNumPorts.put(connNum, port);				future = SessionPool.createConnect(ip, port);				if(future.isConnected()){					logger.info("创建连接------->" + future.getSession());					pool.put(connNum, future.getSession());				}else{					logger.error("连接创建错误,请检查IP和端口配置!" + future);				}						}		}	}		/**	 * 获取一个连接	 * @param num	 * @return	 */	public static IoSession  getSession(String strNum){				logger.info("IP端口号:" + ip + "连接序列号:" + strNum + "端口号:" + connNumPorts.get(strNum));				IoSession session = pool.get(strNum);				if(null == session || !session.isClosing()){			ConnectFuture newConn = createConnect(ip, connNumPorts.get(strNum));						if(!newConn.isConnected()){				newConn =  createConnect(ip,connNumPorts.get(strNum));			}			session = newConn.getSession();			pool.replace(strNum, session);		}				return session;	}		/**	 * 创建连接	 * @param ip	 * @param port	 * @return	 */	private static ConnectFuture createConnect(String strIp,int intPort){				IoConnector connector = new NioSocketConnector();				connector.getFilterChain().addLast("codec"				,new ProtocolCodecFilter(new CharsetCodecFactory()));				connector.setHandler(new SendHandler());				ConnectFuture future = connector.connect(new InetSocketAddress(strIp,intPort));		connector.getSessionConfig().setReadBufferSize(128);		future.awaitUninterruptibly();				return future;	}		/**	 * 生成连接序列号	 * @return	 */	private synchronized String getConnNums(){				if(18 == connNum){			connNum = 0;		}				connNum++;				return String.format("%02x", connNum);	}}

因此。在项目启动的时候就会有18个连接自己主动创建。并放在pool中等待我们的使用。

以下是业务处理类。须要继承IoHandlerAdapter类。而且实现以下几个方法:

代码语言:javascript
复制
import nds.framework.security.NDSMD5;import org.apache.commons.codec.binary.Hex;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;import cm.custom.service.reception.RecResponse;import cm.custom.service.reception.ReceptionResponseServiceStub;/** * 业务处理 * @author yuanfubiao * */public class SendHandler extends IoHandlerAdapter {	private static Log logger = LogFactory.getLog(SendHandler.class);		@Override	public void exceptionCaught(IoSession session, Throwable cause)			throws Exception {		logger.error("连接出错", cause);	}	@Override	/**	 * 设置空暇时间	 */	public void sessionCreated(IoSession session) throws Exception {		session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60);	}		/**	 * 接受到消息后,通过WS发送给用户管理系统	 */	@Override	public void messageReceived(IoSession session, Object message)			throws Exception {		String result = message.toString().trim();		String temp = result.substring(0, result.length()-16).trim();		logger.info("接受到的数据:" + result);		//验证签名		String signature = null;		String securityKey = "12345678";		try {			byte binSignature[] = NDSMD5.signPacket(temp.getBytes(), securityKey);			signature = new String(Hex.encodeHex(binSignature));		} catch (Exception e) {			e.printStackTrace();		}				String packet = temp + signature.toUpperCase().trim();				if(!result.equalsIgnoreCase(packet)){			logger.error("数字签名不对。错误指令:" + result);			return;		}		logger.info("接受到的数据:" + packet);		RecResponse res = new RecResponse();		res.setResponse(temp);		ReceptionResponseServiceStub stub = new ReceptionResponseServiceStub();		stub.recResponse(res);	}		/**	 * 连接空暇时。发送心跳包	 */	@Override	public void sessionIdle(IoSession session, IdleStatus status)			throws Exception {		if(status == IdleStatus.BOTH_IDLE){				session.write("heartbeat");		}	}}

一般我们在写socket程序时。用堵塞的方式读取消息,通常是依据消息换行符或者特殊字符,或者对方关闭流来证明一条信息读取完毕,在mina中,有默认的编解码方式。但也能够自己定义,比方以长度来推断一条消息是否读取完毕:

编码

代码语言:javascript
复制
import java.nio.charset.Charset;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolEncoderAdapter;import org.apache.mina.filter.codec.ProtocolEncoderOutput;/** * 编码 * @author yuanfubiao * */public class CharsetEncoder extends ProtocolEncoderAdapter{		private final static Charset charset = Charset.forName("utf-8");		@Override	public void encode(IoSession session, Object message, ProtocolEncoderOutput out)			throws Exception {				IoBuffer buff = IoBuffer.allocate(100).setAutoExpand(true);		buff.putString(message.toString(), charset.newEncoder());				buff.flip();		out.write(buff);	}}

解码

代码语言:javascript
复制
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;

/**
 * 解码
 * @author yuanfubiao
 *
 */
public class CharsetDecoder extends CumulativeProtocolDecoder{
	private static Log logger = LogFactory.getLog(CharsetDecoder.class);
	@Override
	protected boolean doDecode(IoSession session, IoBuffer in,
			ProtocolDecoderOutput out) throws Exception {
		
		if(in.remaining() >= 9){ //心跳为最小传输长度
			
			byte[] headBytes = new byte[in.limit()];
			logger.info("接收到消息" + headBytes.toString());
			in.get(headBytes, 0, 9);
			String head = new String(headBytes).trim();
			if("heartbeat".equalsIgnoreCase(head)){
				return true;
			}
			
			int lenPack = Integer.parseInt(head.substring(5, 9), 16)-9;
			
			if(in.remaining() == lenPack){ //验证消息长度
				byte[] bodyBytes = new byte[in.limit()];
				in.get(bodyBytes,0,lenPack);
				String body = new String(bodyBytes);
				out.write(head.trim()+body.trim());
				return true;
			}
			in.flip();
			return false;
		}
		return false;
	}
}

源代码下载: http://download.csdn.net/detail/stubbornpotatoes/7438435

关于mina发现一个系列好文章:http://xxgblog.com/2014/10/16/mina-netty-twisted-10/

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/115701.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022年1月3,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档