前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Android | TCP的C(Java|Android)/S(Java)通信实战经典聊天室案例(文末附本案例代码实现概述、观察者模式实现小结)

Android | TCP的C(Java|Android)/S(Java)通信实战经典聊天室案例(文末附本案例代码实现概述、观察者模式实现小结)

作者头像
凌川江雪
发布2019-11-05 17:37:15
5450
发布2019-11-05 17:37:15
举报
文章被收录于专栏:李蔚蓬的专栏李蔚蓬的专栏

案例GitHub地址

创建TCP服务端

  • 在sample模块下, 新建一个名为tcp的package, 创建TcpServer:

  • 指定服务端端口号(ip 默认为本机ip) 启动循环读取消息队列的子线程, 死循环,不断等待客户端请求连接, 一旦连接上, 直接新建一个子线程(丢给ClientTask)去处理这个socket, 于是主线程又可以回到accept() 阻塞,等待下一个连接请求; 同时,将连接上的socket 对应的线程类,注册为消息队列的观察者, 让线程类担任观察者,负责接收被观察者的通知信息并做socket 通信。
代码语言:javascript
复制
/**
 * <pre>
 *     author : 李蔚蓬(简书_凌川江雪)
 *     time   : 2019/10/30 16:57
 *     desc   :指定服务端端口号(ip 默认为本机ip)
 *             启动循环读取消息队列的子线程,
 *             死循环,不断等待客户端请求连接,
 *             一旦连接上,直接新建一个子线程(丢给ClientTask)去处理这个socket,
 *             于是主线程又可以回到accept() 阻塞,等待下一个连接请求;
 *             同时,将连接上的socket 对应的线程类,注册为消息队列的观察者,
 *             让线程类担任观察者,负责接收被观察者的通知信息并做socket 通信
 * </pre>
 */
public class TcpServer {

    public void start() {

        ServerSocket serverSocket = null;
        try {
            serverSocket = new ServerSocket(9090);
            MsgPool.getInstance().start();//启动读消息的子线程

            while (true) {
//            /*
//            阻塞的方法!!!  等待(客户端的) TCP 连接请求
//            客户端有 TCP 请求并连接上了 ServerSocket,.
//            那 accept() 就会返回一个 同一连接上 对应 客户一端socket 的 服务一端socket
//             */
                Socket socket = serverSocket.accept();

                //客户端连接之后,打印相关信息
//            System.out.println("ip: " + socket.getInetAddress().getHostAddress() +
//                    ", port = " + socket.getPort() + " is online...");
                System.out.println("ip = " + "***.***.***.***" +
                        ", port = " + socket.getPort() + " is online...");

//            /*
//                连接上了之后不能直接拿IO流去读写,
//                因为getInputStream() 和 getOutputStream() 都是阻塞的!!!!
//                如果直接拿IO 流,不做其他处理,
//                那么Server端的处理流程是这样的:
//                accept()-- getInputStream()处理第一个客户端 -- 处理完毕,accept()-- getInputStream()处理第二个客户端....
//                所以必须开启子线程去读写客户端,才能做成聊天室
//
//                针对每一个连接上来的客户端去单独起一个线程,跟客户端进行通信
//
//                过程:客户端连上之后,打印其信息,
//                然后直接新建一个子线程(丢给ClientTask)去处理这个socket,
//                于是主线程又可以回到accept() 阻塞,等待下一个连接请求
//             */
                ClientTask clientTask = new ClientTask(socket);
                MsgPool.getInstance().addMsgComingListener(clientTask);
                clientTask.start();


            }


        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new TcpServer().start();
    }
}
  • 针对每一个连接上来的客户端去单独起一个线程,跟客户端进行通信, 准备一个线程类,名为ClientTask, 针对每一个连接上来的客户端去单独起一个线程,跟客户端进行通信, 这里便是线程类; run()中死循环不断读取本类实例对应的客户端发来的信息, 或者发送给对应的连接对面客户端(服务端)要发送的信息; 实现MsgPool.MsgComingListener, 成为消息队列的观察者!!!
代码语言:javascript
复制
/**
 * <pre>
 *     author : 李蔚蓬(简书_凌川江雪)
 *     time   : 2019/10/30 17:23
 *     desc   :针对每一个连接上来的客户端去单独起一个线程,跟客户端进行通信,
 *             这里便是线程类;
 *             run()中死循环不断读取客户端发来的信息,发送给客户端(服务端)要发送的信息;
 *             实现MsgPool.MsgComingListener, 成为消息队列的观察者!!!
 * </pre>
 */
public class ClientTask extends Thread implements MsgPool.MsgComingListener {

    private Socket mSocket;
    private InputStream mIs;
    private OutputStream mOs;

    public ClientTask(Socket socket) {

        try {
            mSocket = socket;
            mIs = socket.getInputStream();
            mOs = socket.getOutputStream();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void run() {
        BufferedReader br = new BufferedReader(new InputStreamReader(mIs));

        String line = null;
        /*
            读取并输出客户端信息。
            如果没有客户端发送信息,readLine() 便会阻塞在原地
         */
        try {
            while ((line = br.readLine()) != null) {
                System.out.println("read " + mSocket.getPort() + " = " + line);
                //把信息发送加入到消息队列,
                // 借助消息队列的被观察者通知方法,
                // 将消息转发至其他Socket(所有socket都在创建ClientTask的时候,
                // 备注成为MsgPool 的观察者)
                MsgPool.getInstance().sendMsg(mSocket.getPort() + ": " + line);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    //作为消息队列的观察者对应的更新方法,
    // 消息队列中最新的消息会推送通知到这里的msg参数,
    // 这里拿到最新的推送消息后,写进输出流,
    // 推到TCP 连接的客户一端的 socket
    @Override
    public void onMsgComing(String msg) {
        try {
            mOs.write(msg.getBytes());
            mOs.write("\n".getBytes());
            mOs.flush();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
  • 准备一个消息队列, 每一个Client发送过来的消息, 都会被加入到队列当中去, 队列中默认有一个子线程, 专门从队列中,死循环,不断去取数据(取出队列的队头), 取到数据就做相关处理,比如分发给其他的socket;
代码语言:javascript
复制
/**
 * <pre>
 *     author : 李蔚蓬(简书_凌川江雪)
 *     time   : 2019/10/30 17:45
 *     desc   :每一个Client发送过来的消息,
 *             都会被加入到队列当中去,
 *             队列中默认有一个子线程,
 *             专门从队列中,死循环,不断去取数据,
 *             取到数据就做相关处理,比如分发给其他的socket;
 * </pre>
 */
public class MsgPool {

    private static MsgPool mInstance = new MsgPool();

    /*
        这里默认消息是String类型,
        或者可以自行封装一个Model 类,存储更详细的信息

        block n.块; 街区;障碍物,阻碍
        顾名思义,这是一个阻塞的队列,当有消息过来时,就把消息发送给这个队列,
        这边会起一个线程专门从队列里面去取消息,
        如果队列中没有消息,就会阻塞在原地
     */
    private LinkedBlockingQueue<String> mQueue = new LinkedBlockingQueue<>();

    public static MsgPool getInstance() {
        return mInstance;
    }

    private MsgPool() {
    }

    //这是一个阻塞的队列,
    // 当有消息过来时,即客户端接收到消息时,
    // 就把消息发送(添加)到这个队列中
    //现在所有的客户端都可以发送消息到这个队列中
    public void sendMsg(String msg) {
        try {
            mQueue.put(msg);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    //要一早就调用本方法,
    // 启动这个读取消息的线程,在后台不断运行
    public void start() {
        //开启一个线程去读队列的数据
        new Thread() {
            @Override
            public void run() {
                //无限循环读取信息
                while (true) {
                    try {
                        //取出并移除队头;没有消息时,take()是阻塞的
                        String msg = mQueue.take();
                        notifyMsgComing(msg);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();
    }

    //被观察者方法,遍历所有已注册的观察者,一次性通知更新
    private void notifyMsgComing(String msg) {
        for (MsgComingListener listener : mListeners) {
            listener.onMsgComing(msg);
        }
    }

    //观察者接口
    public interface MsgComingListener {
        void onMsgComing(String msg);//更新方法
    }

    //被观察者,存放观察者
    private List<MsgComingListener> mListeners = new ArrayList<>();

    //被观察者方法,添加观察者到列表
    public void addMsgComingListener(MsgComingListener listener) {
        mListeners.add(listener);
    }
}

所有的客户端都可发送消息到队列中, 然后所有的客户端都在等待 消息队列的消息新增(mQueue.put())这个时刻, 消息队列一新增消息, 即一接收到某个客户端发送过来消息(mQueue.put()), 则消息都会一次性转发给所有客户端, 所以这里涉及到一个观察者设计模式, 消息队列(MsgPool)或消息(Msg)是被观察者, 所有客户端处理线程(ClientTask)都是观察者 观察者模式实现小结: 观察者接口准备更新(数据或UI的)方法; 被观察者接口准备三个抽象方法; 观察者实现类具体实现更新逻辑,可以有参数,参数为更新需要的数据; 被观察者实现类准备一个观察者List以及实现三个方法: 1.观察者注册方法: 参数为某观察者,功能是把观察者参数加到观察者List中; 2.注销观察者方法: 参数为某观察者,功能是把观察者参数从观察者List中移除; 3.通知观察者方法:无参数或者把需要通知的数据作为参数, 功能是遍历所有已注册的观察者, 即遍历 注册添加到 观察者List中的观察者,逐个调用List中所有观察者的更新方法;即一次性更新所有已注册的观察者! 使用时, 实例化一个被观察者和若干个观察者, 将所有观察者注册到被观察者处, 调用被观察者的通知方法,一次性更新所有已注册的观察者!

创建TCP客户端

创建两个Package,整理一下项目架构,创建一个TcpClient:

代码语言:javascript
复制
/**
 * <pre>
 *     author : 李蔚蓬(简书_凌川江雪)
 *     time   : 2019/10/31 15:36
 *     desc   :
 * </pre>
 */
public class TcpClient {

    private Scanner mScanner;

    public TcpClient() {
        mScanner = new Scanner(System.in);
        mScanner.useDelimiter("\n");
    }

    /**
     * 配置socket
     * 准备IO 流,
     * 主线程写,子线程读
     *
     */
    public void start() {
        try {
            Socket socket = new Socket("***", 9090);
            InputStream is = socket.getInputStream();
            OutputStream os = socket.getOutputStream();

            final BufferedReader br = new BufferedReader(new InputStreamReader(is));
            BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));

            /*
                实现:
                通过 reader,
                在任何时候 能够读到 Server端 发来的数据
                通过 writer,
                在任何时候 能够向 Server端 去写数据
             */
            //在等待客户端 发送消息过来的话,这里是需要阻塞的,
            // 阻塞的时候又没有办法向客户端发送数据,所以读写独立的话,肯定是要起线程的

            //起一个线程,专门用于
            // 读Server 端 发来的数据,数据一过来就读然后输出,
            // 输出服务端发送的数据
            new Thread() {
                @Override
                public void run() {

                    try {
                        String line = null;
                        while ((line = br.readLine()) != null) {
                            System.out.println(line);
                        }
                    } catch (IOException e) {
                    }
                }
            }.start();

            //给Server端 发送数据
            while (true) {
                //next() 是阻塞的,不断地读控制面板,有数据就会通过bufferWriter,
                // 即outputStream 写给Server
                String msg = mScanner.next();
                bw.write(msg);
                bw.newLine();
                bw.flush();
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new TcpClient().start();
    }
}
  • 反复测试:

移植客户端到Android移动端

复制TcpClient到biz包下迭代,名为TcpClientBiz:

代码语言:javascript
复制
  • rename一下MainActivity为UdpActivity:

复制UdpActivity一份,原地粘贴,命名为TcpActivity:

代码语言:javascript
复制

更改启动页面:

反复测试(一个模拟机和两台真机的聊天哈哈哈):

最终Server端聊天记录:

服务端诸类代码实现概述(TcpServer、ClientTask、MsgPool)
  • TcpServer: 死循环,阻塞,等待客户端请求连接,while (true) & .accept(); 一旦连接上,获取对应的socket对象并 把它丢给ClientTask的构造方法,new ClientTask(socket) 直接新建一个子线程,去处理这个socket(.start()), 将连接上的socket 对应的线程类,注册到消息队列类中的队列中, 成为消息队列的观察者;MsgPool.getInstance().addMsgComingListener(clientTask) 启动消息队列读读队列的线程, MsgPool.getInstance().start();
  • ClientTask: public class ClientTask extends Thread implements MsgPool.MsgComingListener 让线程类作为消息队列的观察者, 负责接收被观察者的通知信息并做socket 通信; 类中:
代码语言:txt
复制
- 1/3 构造方法: 接收TcpServer对过来的socket对象,
 用之初始化其IO流;
    - 2/3 
代码语言:txt
复制
- **实现单例模式** 
客户端诸类代码实现概述(TcpClientBiz、TcpActivity)
  • TcpClientBiz: 连接Server端, 后台子线程不断接收Server端发送过来的信息, 前台封装、提供向Server端发送信息的方法
代码语言:txt
复制
- 准备一个绑定了`mainLooper`的`Handler`
- **定义****`<回调机制>`** 
代码语言:txt
复制
    - **开启子线程!!!,** 

开启一个子线程,

拿着全局变量I流,封装成BufferReader,

死循环 阻塞等待 读取Server端信息

while ((line = br.readLine()) != null)

一旦有信息,

借助Handler.post(),

使用全局 回调接口变量抽象调用接口方法**onMsgComing()**

通过回调机制交给Activity层处理;

代码语言:txt
复制
- `sendMsg(final String msg)` 开启一个子线程,
 拿着全局变量O流,封装成BufferWriter,
 把参数msg 写入BufferWriter(O流),发送给Server端;
 调用时机:在要发送消息给Server 的时候调用,
 一般是EditText 右边的按钮被点击的时候
    - onDestroy():
 回收socket、IO流
代码语言:txt
复制
- **需要实例化一个全局TcpClientBiz实例** 

所有的客户端都可发送消息到队列中, 然后所有的客户端都在等待 消息队列的消息新增(mQueue.put())这个时刻, 消息队列一新增消息, 即一接收到某个客户端发送过来消息(mQueue.put()), 则消息都会一次性转发给所有客户端, 所以这里涉及到一个观察者设计模式, 消息队列(MsgPool)或消息(Msg)是被观察者, 所有客户端处理线程(ClientTask)都是观察者 观察者模式实现小结: 观察者接口准备更新(数据或UI的)方法; 被观察者接口准备三个抽象方法; 观察者实现类具体实现更新逻辑,可以有参数,参数为更新需要的数据; 被观察者实现类准备一个观察者List以及实现三个方法: 1.观察者注册方法: 参数为某观察者,功能是把观察者参数加到观察者List中; 2.注销观察者方法: 参数为某观察者,功能是把观察者参数从观察者List中移除; 3.通知观察者方法:无参数或者把需要通知的数据作为参数, 功能是遍历所有已注册的观察者, 即遍历 注册添加到 观察者List中的观察者,逐个调用List中所有观察者的更新方法;即一次性更新所有已注册的观察者! 使用时, 实例化一个被观察者和若干个观察者, 将所有观察者注册到被观察者处, 调用被观察者的通知方法,一次性更新所有已注册的观察者!

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 案例GitHub地址
  • 创建TCP服务端
  • 创建TCP客户端
  • 移植客户端到Android移动端
    • 服务端诸类代码实现概述(TcpServer、ClientTask、MsgPool)
      • 客户端诸类代码实现概述(TcpClientBiz、TcpActivity)
      相关产品与服务
      消息队列 CMQ 版
      消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档