Netty5实现接收服务端返回数据

1、开发spring boot微服务中,需要和第三方服务器做报文交换数据,用netty来实现客户端,并做一个同步接受数据。一下用的是netty5,其它版本的相似即可。

2、pom.xml引入

        <dependency>
		<groupId>io.netty</groupId>
		<artifactId>netty-all</artifactId>
		<version>5.0.0.Alpha1</version>
	</dependency>

3、ClientInitializer编写

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.ReadTimeoutHandler;
public class ClientInitializer extends ChannelInitializer<SocketChannel>{
	private CountDownLatch lathc;
	public ClientInitializer(CountDownLatch lathc) {
		this.lathc = lathc;
	}
	private ClientHandler handler;
	@Override
	protected void initChannel(SocketChannel sc) throws Exception {
		handler =  new ClientHandler(lathc);
		sc.pipeline().addLast(new StringDecoder());//进行字符串的编解码设置
		sc.pipeline().addLast(new StringEncoder());
		sc.pipeline().addLast(new ReadTimeoutHandler(60));//设置超时时间
		sc.pipeline().addLast(handler);
	}
	public String getServerResult(){
        return handler.getResult();
    }
    public void resetLathc(CountDownLatch lathc) {
        handler.resetLatch(lathc);
    }

}

4、ClientHandler编码实现

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CountDownLatch;
public class ClientHandler extends ChannelHandlerAdapter{
	
	private CountDownLatch lathc;
	private String result;
	public ClientHandler(CountDownLatch lathc) {
        this.lathc = lathc;
        }
	
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {

	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) {
		result = (String) msg;
		lathc.countDown();// 消息接收后释放同步锁,lathc是从Client加一传回来的
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}
	public void resetLatch(CountDownLatch lathc) {
		this.lathc = lathc;
	}

	public String getResult() {
		return result;
	}

	
}

5、Client端运行主程序编写

import java.util.concurrent.CountDownLatch;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;;
public class Client {//编写客户端单例模式方便系统调用
	private static class SingletonHolder {
		static final Client instance = new Client();
	}
	public static Client getInstance(){
		return SingletonHolder.instance;
	}
	private EventLoopGroup group;
	private Bootstrap b;
	private ChannelFuture cf ;
	private ClientInitializer clientInitializer;
	private CountDownLatch lathc;
	private Client(){
		    lathc = new CountDownLatch(0);
		    clientInitializer = new ClientInitializer(lathc);
			group = new NioEventLoopGroup();
			b = new Bootstrap();
			b.group(group)
			 .channel(NioSocketChannel.class)
			 .handler(clientInitializer);
	}
	public void connect(){
		//192.168.43.51测试端口8766 192.168.43.102 线上端口8765
		try {
			this.cf = b.connect("127.0.01", 8888).sync();
			System.out.println("远程服务器已经连接, 可以进行数据交换..");
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
		}
	}
	public ChannelFuture getChannelFuture(){
		if(this.cf == null) {
			this.connect();
		}
		if(!this.cf.channel().isActive()){
			this.connect();
		}
		return this.cf;
	}
	public void close(){
		try {
			this.cf.channel().closeFuture().sync();
			this.group.shutdownGracefully();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	 public String setMessage(String msg) throws InterruptedException{
	    	ChannelFuture cf =getInstance().getChannelFuture();//单例模式获取ChannelFuture对象
	    	cf.channel().writeAndFlush(msg);
	    	//发送数据控制门闩加一
	    	lathc = new CountDownLatch(1);
	        clientInitializer.resetLathc(lathc);
	        lathc.await();//开始等待结果返回后执行下面的代码
	        return clientInitializer.getServerResult();
	  }
	 public static void main(String[] args) throws Exception {
	    System.out.println(Client.getInstance().setMessage("123"));//测试等待数据返回
	 }
}

6、以上代码完整,直接复制粘贴即可使用

欢迎大家有问题和意见可以留言

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏芋道源码1024

熔断器 Hystrix 源码解析 —— 命令执行(二)之执行隔离策略

本文主要基于 Hystrix 1.5.X 版本 1. 概述 2. HystrixThreadPoolProperties 3. HystrixThreadPoo...

4966
来自专栏LanceToBigData

Java多线程之细说线程池

前言   在认识线程池之前,我们需要使用线程就去创建一个线程,但是我们会发现有一个问题:    如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就...

5325
来自专栏進无尽的文章

编码篇-数据管理者Model

      Model是数据管理者和持有者,是数据解析层剥离ViewConyroller的关键所在。同是也是cell滑动不卡(省去每次解析)的好方式。

993
来自专栏Hongten

python开发_shelve_完整版_博主推荐

=====================================================

832
来自专栏你不就像风一样

Java中的并发工具类(CountDownLatch,CyclicBarrier,Semaphore,Exchanger)

在JDK的并发包里提供了很多有意思的并发工具类。CountDownLatch、CyclicBarrier和Semaphore 工具类提供了一种并发流程控制的手段...

1245
来自专栏我的小碗汤

自动评论csdn博客文章实现

今天我们来用java代码爬取csdn博客网站,然后自动评论,这一波操作可以说是相当风骚了,话不多说,咱上代码。

2002
来自专栏二进制文集

Java 生产者消费者实现 —— BlockingQueue

对着《Java 编程思想》,通过wait - notifyAll实现了生产者消费者模式。今天用BlockingQueue实现一下。

1904
来自专栏yukong的小专栏

【java并发编程实战2】无锁编程CAS与atomic包1、无锁编程CAS2、 atomic族类

如果V值等于E值,则将V的值设为N。若V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做。通俗的理解就是CAS操作需要我们提供一个期望值,当期望...

1223
来自专栏JavaQ

高并发编程-CyclicBarrier深入解析

CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到所有线程都到达某个公共屏障点(也可以叫同步点),即相互等待的线程都完成调用await方法...

1K3
来自专栏文武兼修ing——机器学习与IC设计

队列及其实现队列队列的实现

队列 队列即FIFO,一言以蔽之就是先进先出。比如入队列的顺序是1,2,3,4,那么出队列的顺序也是1,2,3,4 队列的实现 软件——GO语言实现 除了使用链...

4177

扫码关注云+社区

领取腾讯云代金券