用MINA实现UDP通信的例子

概述:

Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于TCP/IP、UDP/IP协议栈的通信框架(当然,也可以提供JAVA 对象的序列化服务、虚拟机管道通信服务等),Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步IO 默认使用的是JAVA NIO 作为底层支持)操作的编程模型。

UDP通信实现:

1、pom.xml,需要依赖:

               <!-- mina核心源码 -->
    <dependency>
        <groupId>org.apache.mina</groupId>
        <artifactId>mina-core</artifactId>
        <version>2.0.16</version>
    </dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-api</artifactId>
  <version>1.7.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-jdk14 -->
<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-jdk14</artifactId>
  <version>1.7.7</version>
  <scope>test</scope>
</dependency>

2、MinaServer.java

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executors;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.ExpiringSessionRecycler;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.transport.socket.DatagramSessionConfig;
import org.apache.mina.transport.socket.nio.NioDatagramAcceptor;

public class MinaServer {
static NioDatagramAcceptor acceptor = new NioDatagramAcceptor();
public static void main(String[] args) throws Exception {
    acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool()));
    // 设置handler,把这个IoHandler 注册到IoService
    acceptor.setHandler(new MinaServerHandler());
    acceptor.setSessionRecycler(new ExpiringSessionRecycler(15 * 1000));
    DatagramSessionConfig dcfg = acceptor.getSessionConfig();
    dcfg.setReuseAddress(true);
    dcfg.setReceiveBufferSize(1024);
    dcfg.setSendBufferSize(1024);
     //绑定端口
    acceptor.bind(new InetSocketAddress(2222));
    acceptor.bind(new InetSocketAddress(6000));

    System.out.println("Server Listening...");
}

public static Boolean SendMsg(String message, String sessionip, int port) {
    try {
        SocketAddress ra = new InetSocketAddress(sessionip, port);
        SocketAddress loc = new InetSocketAddress(2222);
        IoSession MySession = acceptor.newSession(ra, loc);
        byte[] res = hex2Byte(message);
        IoBuffer buf = IoBuffer.wrap(res);
        WriteFuture future = MySession.write(buf, ra);
        future.awaitUninterruptibly(100);
        if (future.isWritten()) {
            MySession.closeOnFlush();
            return true;
        } else {
            MySession.closeOnFlush();
            return false;
        }
    } catch (Exception ex) {
        return false;
    }
}

public static byte[] hex2Byte(String hex) {
    String digital = "0123456789ABCDEF";
    char[] hex2char = hex.toCharArray();
    byte[] bytes = new byte[hex.length() / 2];
    int temp;
    for (int i = 0; i < bytes.length; i++) {
        temp = digital.indexOf(hex2char[2 * i]) * 16;
        temp += digital.indexOf(hex2char[2 * i + 1]);
        bytes[i] = (byte) (temp & 0xff);
    }
    return bytes;
  }
}

3、MinaServerHandler.java

import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;


public class MinaServerHandler extends IoHandlerAdapter {

public static String HEX = "0123456789ABCDEF";
public static int i = 0;
public static final CharsetDecoder decoder = (Charset.forName("UTF-8")).newDecoder();

@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
    cause.printStackTrace();
    super.exceptionCaught(session, cause);
}

@Override
public void messageReceived(IoSession session, Object message) throws Exception {
    //IoBuffer buffer = (IoBuffer) message;
    //byte[] bb = buffer.array();
    //String msg = bytesToHex(bb);
    System.out.println("i get msg:"+message.toString() + " ======> " + i++);
}

@Override
public void messageSent(IoSession session, Object message) throws Exception {
    super.messageSent(session, message);
}

public static String bytesToHex(byte[] bytes) {
    StringBuffer sb = new StringBuffer(bytes.length * 2);
    String Hex;
    for (byte b : bytes) {
        sb.append(HEX.charAt((b >> 4) & 0x0f));
        sb.append(HEX.charAt(b & 0x0f));
    }
    Hex=sb.toString();
    return Hex;
}
}

方法解释:

(1.) IoService:最底层的是IOService,负责具体的IO相关工作。这一层的典型代表有IOSocketAcceptor和IOSocketChannel,分别对应TCP协议下的服务端和客户端的IOService。IOService的意义在于隐藏底层IO的细节,对上提供统一的基于事件的异步IO接口。每当有数据到达时,IOService会先调用底层IO接口读取数据,封装成IoBuffer,之后以事件的形式通知上层代码,从而将Java NIO的同步IO接口转化成了异步IO。所以从图上看,进来的low-level IO经过IOService层后变成IO Event。具体的代码可以参考org.apache.mina.core.polling.AbstractPollingIoProcessor的私有内部类Processor。

(2.) IoProcessor:这个接口在另一个线程上,负责检查是否有数据在通道上读写,也就是说它也拥有自己的Selector,这是与我们使用JAVA NIO 编码时的一个不同之处,通常在JAVA NIO 编码中,我们都是使用一个Selector,也就是不区分IoService与IoProcessor 两个功能接口。另外,IoProcessor 负责调用注册在IoService 上的过滤器,并在过滤器链之后调用IoHandler。 (3.) IoFilter:这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤、数据的编码(write 方向)与解码(read 方向)等功能,其中数据的encode 与decode是最为重要的、也是你在使用Mina 时最主要关注的地方。 (4.) IoHandler:这个接口负责编写业务逻辑,也就是接收、发送数据的地方。需要有开发者自己来实现这个接口。IoHandler可以看成是Mina处理流程的终点,每个IoService都需要指定一个IoHandler。

(5.)IoSession:是对底层连接(服务器与客户端的特定连接,该连接由服务器地址、端口以及客户端地址、端口来决定)的封装,一个IoSession对应于一个底层的IO连接(在Mina中UDP也被抽象成了连接)。通过IoSession,可以获取当前连接相关的上下文信息,以及向远程peer发送数据。发送数据其实也是个异步的过程。发送的操作首先会逆向穿过IoFilterChain,到达IoService。但IoService上并不会直接调用底层IO接口来将数据发送出去,而是会将该次调用封装成一个WriteRequest,放入session的writeRequestQueue中,最后由IoProcessor线程统一调度flush出去。所以发送操作并不会引起上层调用线程的阻塞。具体代码可以参考org.apache.mina.core.filterchain.DefaultIoFilterChain的内部类HeadFilter的filterWrite方法。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Android 研究

OKHttp源码解析(三)--中阶之线程池和消息队列

android的异步任务一般都是用Thread+Handler或者AsyncTask来实现,其中笔者当初经历过各种各样坑,特别是内存泄漏,当初笔者可是相当的欲死...

2363
来自专栏比原链

Derek解读Bytom源码-启动与停止

Gitee地址:https://gitee.com/BytomBlockchain/bytom

1243
来自专栏Janti

基础巩固——长连接 、短连接、心跳机制与断线重连

本文将从长连接和短连接的概念切入,再到长连接与短连接的区别,以及应用场景,引出心跳机制和断线重连,给出代码实现。

3591
来自专栏大内老A

ASP.NET MVC集成EntLib实现“自动化”异常处理[实例篇]

个人觉得异常处理对于程序员来说是最为熟悉的同时也是最难掌握的。说它熟悉,因为仅仅就是try/catch/finally而已。说它难以掌握,则是因为很多开发人员却...

21410
来自专栏Linux驱动

15.linux-LCD层次分析(详解)

如果我们的系统要用GUI(图形界面接口),这时LCD设备驱动程序就应该编写成frambuffer接口,而不是像之前那样只编写操作底层的LCD控制器接口。 什么是...

2656
来自专栏lzj_learn_note

Volley源码分析学习

2)根据SDK版本来创建HttpStack的实现,如果是2.3以上的,则使用基于HttpUrlConnection实现的HurlStack,反之,则利用Http...

1106
来自专栏木木玲

Netty 那些事儿 ——— 心跳机制

7489
来自专栏刘望舒

Android系统启动流程(四)Launcher启动过程与系统启动流程

前言 此前的文章我们学习了init进程、Zygote进程和SyetemServer进程的启动过程,这一篇文章我们就来学习Android系统启动流程的最后一步:L...

2398
来自专栏实战docker

修改,编译,GDB调试openjdk8源码(docker环境下)

在上一章《在docker上编译openjdk8》里,我们在docker容器内成功编译了openjdk8的源码,有没有读者朋友产生过这个念头:“能不能修改open...

5289
来自专栏搜云库

使用 Phoenix-4.11.0连接 Hbase 集群 ,并使用 JDBC 查询测试

什么是 Phoenix ? Apache Phoenix 是运行在Hbase之上的高性能关系型数据库,通过Phoenix可以像使用jdbc访问关系型数据库一样访...

4705

扫码关注云+社区

领取腾讯云代金券