在Openfire上弄一个简单的推送系统

推送系统

说是推送系统有点大,其实就是一个消息广播功能吧。作用其实也就是由服务端接收到消息然后推送到订阅的客户端。

思路

对于推送最关键的是服务端向客户端发送数据,客户端向服务端订阅自己想要的消息。这样的好处就是有消息后才向客户端推送,相比于拉取数据不会产生许多无效的查询,实时性也高。

xmpp这种即时通信协议基于TCP长连接还是比较符合这种场景的。只需要在服务端增加一个模块用于接收用户订阅与数据的推送就完成了主体功能。

在xmpp协议里可以扩展组件,这样我们写一个组件,然后连接到xmpp服务器,这样就可以应用于不同的xmpp服务器。

准备工作

主要的环境

因为我比较熟悉openfire的体系,所以自然就用它。客户端暂时没有特别的需求,只是用于接收数据,所以用smack或者任何一款xmpp 客户端都可以。我为了简单就用smack写一个简单的代码。

需要用到的jar包

用到的了whack的core,在maven工程里直接引用即可,相关的依赖包会自动加载进来

<dependency>
  <groupId>org.igniterealtime.whack</groupId>
  <artifactId>core</artifactId>
  <version>2.0.1-SNAPSHOT</version>
  <type>jar</type>
</dependency>

核心模块

推送服务

推送服务就是等待或者获得需要推送的消息数据后向用户广播出去的服务。因为这里暂时没有设定数据的场景,所以就简单的用一个阻塞队列来表示。步骤:

  1. 数据通过推送接口写入到推送服务
  2. 推送服务将数据写入到消息队列
  3. 发送线程检测到消息后取出并发给订阅的客户端

在此我写了一个PushServer的类用于表示推送服务,这个类里包含了:

  1. 一个消息队列
  2. 一个发送线程
  3. 一个订阅列表
  4. 以及一些发送相关的xmpp组件

消息队列

//消息列表
private BlockingQueue<Packet> packetQueue;

使用到了生产者消费者模式,所以用了一个阻塞队列,用于存放等待发送的消息数据。

发送线程

private class PacketSenderThread extends Thread {
		private volatile Boolean shutdown = false;
		private BlockingQueue<Packet> queue;
		private Component component;
		private ComponentManager componentManager;

		public PacketSenderThread(ComponentManager componentManager, Component component, BlockingQueue<Packet> queue) {
			this.componentManager = componentManager;
			this.component = component;
			this.queue = queue;
		}
		
		public void run() {
			while (!shutdown) {
				Packet p;
				try {
					p = queue.take();
					componentManager.sendPacket(component, p);
				} catch (InterruptedException e1) {
					System.err.println(e1.getStackTrace());
				} catch (ComponentException e) {
					e.printStackTrace();
				}
			}
		}
		
		public void shutdown() {
			shutdown = true;
			this.interrupt();
		}
		
	}

这个线程继承了Thread,线程的功能很简单,就是一直从queue中获得消息,因为是阻塞的队列,所以没有消息时会阻塞,一旦有消息就会执行发送sendPacket将包发送出去。

这里使用到了componentManager,这个是openfire实现的一个组件管理类,通过这个类的对象可以发送xmpp数据包。

增加shutdown方法,使得线程可以在外部进行退出操作。

订阅列表

//订阅列表
	private Set<JID> subscriptions;
	
	public synchronized void subscription(JID jid) {
		subscriptions.add(jid);
	}
	
	public synchronized void unsubscription(JID jid) {
		subscriptions.remove(jid);
	}

只有订阅了这个推送服务的客户端才会进行推送操作,这里的代码就是用于订阅与退订操作。用了一个HashSet来存储。

xmpp组件

public class PushComponent extends AbstractComponent{
	
	public PushComponent() {
	}

	@Override
	public String getDescription() {
		return "用于消息推送服务组件,主要功能就是将消息转发给具体的客户端,实现消息中转的功能";
	}

	@Override
	public String getName() {
		return "pusher";
	}
	
	@Override
	protected void handleMessage(Message message) {
	}

}

public class PushManager {
	private static PushManager _instance = new PushManager();
	private Map<String, PushServer> pushServers;
	private ExternalComponentManager manager;
	
	private PushManager() {
		pushServers = new ConcurrentHashMap<String, PushServer>();
		manager = new ExternalComponentManager("192.168.149.214", 5275);
        manager.setSecretKey("push", "test");
        manager.setMultipleAllowed("push", true);		
	}

	public static PushManager getInstance() {
		return _instance;
	}
	
	public void init() {
        try {
    		//初始化PushServer
    		PushServer pushSvr = new PushServer("push", manager);
    		pushServers.put("push", pushSvr);
    		//注册Component到xmpp服务器
    		manager.addComponent(pushSvr.getPushDomain(), pushSvr.getComp());
        } catch (ComponentException e) {
            e.printStackTrace();
        }
	}
	
	public PushServer getPushServer(String pushDomain) {
		return pushServers.get(pushDomain);
	}
}

这里的PushComponent就是一个xmpp组件,相当于一个扩展模块,可以接收消息并处理消息,也就是自己写一些和xmpp相关的业务功能。

PushManager就是管理组件并连接到xmpp服务器的一个类。

服务端启动

public class App 
{
    public static void main( String[] args )
    {
		PushManager.getInstance().init();
		//推送消息
		PushServer ps = PushManager.getInstance().getPushServer("push");
		ps.start();

		JID client1 = new JID("1twja8e8yr@domain/1twja8e8yr");
		ps.subscription(client1);
		try {
			for (Integer i = 0; i< 200; i++) {
				ps.putPacket("推送消息200:" + i.toString());
				Thread.sleep(1);
			}
            Thread.sleep(5000);
		} catch (InterruptedException e1) {
			e1.printStackTrace();
		}
		ps.stop();
        System.out.println("go die");
    }
}

这段代码模拟了服务的启动,同时为了简化功能这里直接添加了一个订阅用户。

客户端

public class TestAnonymous {
	public static void main(String[] args) {
		AbstractXMPPConnection connection = SesseionHelper.newConn("192.168.149.214", 5223, "domain");
		try {
			connection.login();//匿名登录
			
			connection.addAsyncStanzaListener(new StanzaListener() {
				
				@Override
				public void processPacket(Stanza packet) throws NotConnectedException {
					System.out.println((new Date()).toString()+ ":" + packet.toXML());
				}
			}, new StanzaFilter() {
				@Override
				public boolean accept(Stanza stanza) {
					return stanza instanceof Message;
				}
			});
		} catch (XMPPException | SmackException | IOException e) {
			e.printStackTrace();
		}

		while (true) {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
	}
}

客户端代码启动一个xmpp连接,然后登录到服务器,同时订阅消息,将收到的消息print出来。

整个过程就完成了。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏吾爱乐享

CentOS7安装mysql提示“No package mysql-server available

1804
来自专栏linux系统运维

原 主动模式和被动模式,添加监控主机,添加

1632
来自专栏一个爱瞎折腾的程序猿

初次尝试Linux并记录一二

若出现 服务器拒绝了SETP连接,但它监听FTP链接。。。没有安装sshd 解决方案

1191
来自专栏搜云库

CentOs7.3 搭建 RabbitMQ 3.6 Cluster 集群服务

CentOs7.3 搭建 RabbitMQ 3.6 Cluster 集群服务 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种...

4236
来自专栏androidBlog

Git 配置别名 —— 让命令变得更简单

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/gdutxiaoxu/article/de...

1401
来自专栏雨过天晴

原 PHPStrom 9系列激活码

1944
来自专栏JavaEdge

2018-08-02

hibernate执行更新需要较长时间,因此需要等待,否则无法获得更新后字段

942
来自专栏python读书笔记

python 数据分析基础 day7-xlrd,xlwt读写多个excel通过xlrd和xlwt读多个excel文件并写入一个新excel文件

今天总结的内容为通过xlrd和xlwt模块将读取的多个excel文件中多个工作表输出至多个excel文件中。 通过xlrd和xlwt读多个excel文件并写入一...

3708
来自专栏KaliArch

Kafka-manager部署

1.1 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。

3605
来自专栏电光石火

修改Centos服务器主机名称

Centos服务器安装好之后,默认的主机名为:localhost.localdomain,为了便与管理,我们需要对服务器主机名称进行修改,此修改生效涉及到两个配...

1912

扫码关注云+社区

领取腾讯云代金券