前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于Socket.IO的Client封装

基于Socket.IO的Client封装

作者头像
FunTester
发布2020-12-09 14:38:31
1.1K0
发布2020-12-09 14:38:31
举报
文章被收录于专栏:FunTesterFunTester

有了WebSocket的经验,这次写Socket.IOClient顺利了很多,参考之前的文章:socket接口开发和测试初探IntelliJ中基于文本的HTTP客户端基于WebSocket的client封装。之前的代码有更新,主要修复了一些BUG以及增加了一些功能方便在实际功能测试中使用,关于性能测试的,接下来还会在继续优化和多线程Socket接口的测试实践。

  • Gitee地址https://gitee.com/fanapi/tester
  • GitHub地址https://github.com/JunManYuanLong/FunTester

本次与WebSocket区别在于多记录了一些监听event的名称,不知道会有啥用,我猜将来用于做收到消息的响应业务的话,应该会用到,所以用看了一个public ConcurrentSet<String> events = new ConcurrentSet<>();记录。

关于send()方法,我并没有进行多个重载,测试代码中大家可以看到,我直接用的String类型的请求参数,然后转成JSON,打算后期直接把各种消息封装成不同的对象,所以只保留了一个send()方法。

代码语言:javascript
复制
    /**
     * 发送消息,暂不重载
     *
     * @param event
     * @param objects
     */
    public void send(String event, Object... objects) {
        events.add(event);
        this.socket.emit(event, objects);
    }

依赖

  • Gradle
代码语言:javascript
复制
// https://mvnrepository.com/artifact/io.socket/socket.io-client
compile group: 'io.socket', name: 'socket.io-client', version: '1.0.0'

  • Maven
代码语言:javascript
复制
<!-- https://mvnrepository.com/artifact/io.socket/socket.io-client -->
<dependency>
    <groupId>io.socket</groupId>
    <artifactId>socket.io-client</artifactId>
    <version>1.0.0</version>
</dependency>

ScoketIOFunClient

代码语言:javascript
复制
package com.fun.frame.socket;

import com.fun.base.exception.FailException;
import com.fun.config.SocketConstant;
import com.fun.frame.SourceCode;
import com.fun.utils.RString;
import io.netty.util.internal.ConcurrentSet;
import io.socket.client.IO;
import io.socket.client.Socket;
import io.socket.emitter.Emitter;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Vector;

/**
 * 基于Socket.IO的Client封装对象
 */
public class ScoketIOFunClient extends SourceCode {

    private static Logger logger = LoggerFactory.getLogger(ScoketIOFunClient.class);

    public static IO.Options options = initOptions();

    public static Vector<ScoketIOFunClient> clients = new Vector<>();

    public LinkedList<String> msgs = new LinkedList<>();

    private String cname;

    private String url;

    public Socket socket;

    /**
     * 监听事件记录
     */
    public ConcurrentSet<String> events = new ConcurrentSet<>();


    private ScoketIOFunClient(String url, Socket socket) {
        this.url = url;
        this.socket = socket;
        clients.add(this);
    }

    /**
     * 获取socketClient实例
     *
     * @param url
     * @param cname
     * @return
     */
    public static ScoketIOFunClient getInstance(String url, String cname) {
        ScoketIOFunClient client = null;
        try {
            client = new ScoketIOFunClient(url, IO.socket(url, options));
            client.setCname(cname);
        } catch (URISyntaxException e) {
            FailException.fail();
        }
        return client;
    }


    /**
     * 初始化连接选项的方法,默认采取重置
     *
     * @return
     */
    public static IO.Options initOptions() {
        IO.Options options = new IO.Options();
        options.transports = SocketConstant.transports;
        //失败重试次数
        options.reconnectionAttempts = SocketConstant.MAX_RETRY;
        //失败重连的时间间隔
        options.reconnectionDelay = SocketConstant.RETRY_DELAY;
        //连接超时时间(ms)
        options.timeout = SocketConstant.TIMEOUT;
        return options;
    }

    /**
     * 注册通用的事件监听
     * {@link io.socket.client.Socket}
     */
    public void init() {
        this.socket.on(Socket.EVENT_CONNECTING, objects -> {
            logger.info("{} 正在连接...信息:{}", cname, initMsg(objects));
        });
        events.add(Socket.EVENT_CONNECTING);
        this.socket.on(Socket.EVENT_ERROR, objects -> {
            logger.info("{} 收到错误信息:{}", cname, initMsg(objects));
        });
        events.add(Socket.EVENT_ERROR);
        this.socket.on(Socket.EVENT_CONNECT_TIMEOUT, objects -> {
            logger.info("{} 连接超时!,url:{},信息:{}", cname, url, initMsg(objects));
        });
        events.add(Socket.EVENT_CONNECT_TIMEOUT);
        this.socket.on(Socket.EVENT_CONNECT_ERROR, objects -> {
            logger.info("{} 连接错误,信息:{}", cname, initMsg(objects));
        });
        events.add(Socket.EVENT_CONNECT_ERROR);
        /*此处统一的message做记录*/
        this.socket.on(Socket.EVENT_MESSAGE, objects -> {
            String msg = initMsg(objects);
            saveMsg(msg);
            logger.info("{} 收到消息事件,信息:{}", cname, msg);
        });
    }

    /**
     * 开始建立socket连接
     */
    public void connect() {
        this.socket.connect();
        logger.info("{} 开始连接...", cname);
        this.socket.connect();
        int a = 0;
        while (true) {
            if (this.socket.connected()) break;
            if ((a++ > SocketConstant.MAX_RETRY)) FailException.fail(cname + "连接重试失败!");
            SourceCode.sleep(SocketConstant.WAIT_INTERVAL);
        }
        logger.info("{} 连接成功!", cname);
    }

    /**
     * 添加监听事件
     *
     * @param event
     * @param fn
     */
    public void addEventListener(String event, Emitter.Listener fn) {
        events.add(event);
        this.socket.on(event, fn);
    }

    /**
     * 发送消息,暂不重载
     *
     * @param event
     * @param objects
     */
    public void send(String event, Object... objects) {
        events.add(event);
        this.socket.emit(event, objects);
    }

    /**
     * 关闭SocketClient
     */
    public void close() {
        logger.info("{} socket链接关闭!", cname);
        this.socket.close();
    }

    /**
     * 初始化收到的信息
     *
     * @param objects
     * @return
     */
    public static String initMsg(Object... objects) {
        if (ArrayUtils.isEmpty(objects)) return EMPTY;
        return Arrays.toString(objects);
    }

    /**
     * 该方法用于性能测试中,clone多线程对象
     *
     * @return
     */
    @Override
    public ScoketIOFunClient clone() {
        return getInstance(this.url, this.cname + RString.getString(4));
    }

    /**
     * 设置cname,多用于性能测试clone()之后
     *
     * @param cname
     */
    public void setCname(String cname) {
        this.cname = cname;
    }

    public String getCname() {
        return cname;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    /**
     * 保存收到的信息,只保留最近的{@link SocketConstant}条
     *
     * @param msg
     */
    public void saveMsg(String msg) {
        synchronized (msgs) {
            if (msgs.size() > SocketConstant.MAX_MSG_SIZE) msgs.remove();
            msgs.add(msg);
        }
    }

    /**
     * 关闭所有socketclient
     */
    public static void closeAll() {
        clients.forEach(x ->
                {
                    if (x != null && x.socket.connected()) x.close();
                }
        );
        clients.clear();
        logger.info("关闭所有Socket客户端!");
    }


}

测试Demo

这次我学乖了,先用Java语言趟一趟浑水。

代码语言:javascript
复制
package com.fun.ztest.java;

import com.alibaba.fastjson.JSON;
import com.fun.frame.SourceCode;
import com.fun.frame.socket.ScoketIOFunClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Tdd extends SourceCode {

    private static Logger logger = LoggerFactory.getLogger(Tdd.class);

    public static void main(String[] args) throws InterruptedException {
        ScoketIOFunClient instance = ScoketIOFunClient.getInstance("http://ailearn-instruction-stress.xk12.cn:38899/?systemId=61951375269&loginType=3&token=4f99f5313c464070a40c709f72e8f72c&userType=1", DEFAULT_STRING);

        instance.connect();
        instance.addEventListener("my_response", objects -> {
            String s = ScoketIOFunClient.initMsg(objects);
            logger.info("{}收到my_response消息:{}", instance.getCname(), s);
        });

        String rege = "{\"cmd\": \"register\", \"userId\": 61951375269, \"role\": \"T\", \"deviceVersion\": \"1.0\", \"s_sid\": 123, \"token\": \"4f99f5313c464070a40c709f72e8f72c\"}";
        instance.send("my_event", JSON.parseObject(rege));
//        instance.send("my_event", JSON.parseObject(rege));
        String ss = "{\"cmd\": \"joinRoom\", \"roomId\": 8888}";
        instance.send("my_event", JSON.parseObject(ss));

        sleep(10);
        instance.close();
    }


}

控制台输出

代码语言:javascript
复制
INFO-> 当前用户:fv,IP:10.60.192.21,工作目录:/Users/fv/Documents/workspace/fun/,系统编码格式:UTF-8,系统Mac OS X版本:10.15.7
INFO-> FunTester 开始连接...
INFO-> FunTester 连接成功!
INFO-> FunTester收到my_response消息:[{"msg":"","code":0,"data":{"role":"T","s_sid":123,"deviceVersion":"1.0","userId":61951375269,"token":"4f99f5313c464070a40c709f72e8f72c"},"cmd":"registerResponse"}]
INFO-> FunTester收到my_response消息:[{"msg":"","code":0,"data":{"roomId":8888},"cmd":"joinRoomResponse"}]
INFO-> FunTester socket链接关闭!

Process finished with exit code 0


公众号「FunTester」,非著名测试开发,文章记录学习和感悟,欢迎关注,交流成长。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 FunTester 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 依赖
  • ScoketIOFunClient
  • 测试Demo
  • 控制台输出
    • 公众号「FunTester」,非著名测试开发,文章记录学习和感悟,欢迎关注,交流成长。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档