.markdown-body{word-break:break-word;line-height:1.75;font-weight:400;font-size:15px;overflow-x:hidden;color:#333}.markdown-body h1,.markdown-body h2,.markdown-body h3,.markdown-body h4,.markdown-body h5,.markdown-body h6{line-height:1.5;margin-top:35px;margin-bottom:10px;padding-bottom:5px}.markdown-body h1{font-size:30px;margin-bottom:5px}.markdown-body h2{padding-bottom:12px;font-size:24px;border-bottom:1px solid #ececec}.markdown-body h3{font-size:18px;padding-bottom:0}.markdown-body h4{font-size:16px}.markdown-body h5{font-size:15px}.markdown-body h6{margin-top:5px}.markdown-body p{line-height:inherit;margin-top:22px;margin-bottom:22px}.markdown-body img{max-width:100%}.markdown-body hr{border:none;border-top:1px solid #ddd;margin-top:32px;margin-bottom:32px}.markdown-body code{word-break:break-word;border-radius:2px;overflow-x:auto;background-color:#fff5f5;color:#ff502c;font-size:.87em;padding:.065em .4em}.markdown-body code,.markdown-body pre{font-family:Menlo,Monaco,Consolas,Courier New,monospace}.markdown-body pre{overflow:auto;position:relative;line-height:1.75}.markdown-body pre>code{font-size:12px;padding:15px 12px;margin:0;word-break:normal;display:block;overflow-x:auto;color:#333;background:#f8f8f8}.markdown-body a{text-decoration:none;color:#0269c8;border-bottom:1px solid #d1e9ff}.markdown-body a:active,.markdown-body a:hover{color:#275b8c}.markdown-body table{display:inline-block!important;font-size:12px;width:auto;max-width:100%;overflow:auto;border:1px solid #f6f6f6}.markdown-body thead{background:#f6f6f6;color:#000;text-align:left}.markdown-body tr:nth-child(2n){background-color:#fcfcfc}.markdown-body td,.markdown-body th{padding:12px 7px;line-height:24px}.markdown-body td{min-width:120px}.markdown-body blockquote{color:#666;padding:1px 23px;margin:22px 0;border-left:4px solid #cbcbcb;background-color:#f8f8f8}.markdown-body blockquote:after{display:block;content:""}.markdown-body blockquote>p{margin:10px 0}.markdown-body ol,.markdown-body ul{padding-left:28px}.markdown-body ol li,.markdown-body ul li{margin-bottom:0;list-style:inherit}.markdown-body ol li .task-list-item,.markdown-body ul li .task-list-item{list-style:none}.markdown-body ol li .task-list-item ol,.markdown-body ol li .task-list-item ul,.markdown-body ul li .task-list-item ol,.markdown-body ul li .task-list-item ul{margin-top:0}.markdown-body ol ol,.markdown-body ol ul,.markdown-body ul ol,.markdown-body ul ul{margin-top:3px}.markdown-body ol li{padding-left:6px}@media (max-width:720px){.markdown-body h1{font-size:24px}.markdown-body h2{font-size:20px}.markdown-body h3{font-size:18px}}
/**
* @name: LiveMQ
* @msg: 消息基础类,实现队列功能
*/
class LiveMQ {
public queue: Array<any>;// 队列数据
public callback: (message) => void;// 接受到消息,处理回调函数
public handler = {};// proxy的 handler 为了数据劫持
constructor() {}
//入队
public enqueue() {
var len = arguments.length;
if (len == 0) {
return;
}
for (var i = 0; i < len; i++) {
this.queue.push(arguments[i]);
}
}
//出队
dequeue() {
var result = this.queue[0];
return typeof result != "undefined" ? result : new Error("error");
}
// 确认消费
confirm() {
this.queue.splice(0, 1);
}
//队列是否为空
isEmpty() {
return this.queue.length === 0;
}
//返回队列长度
size() {
return this.queue.length;
}
//清空队列
clear() {
this.queue = new Proxy([], this.handler);
}
//返回队列
show() {
return this.queue;
}
}
复制代码
/**
* @name: LiveHandleMQ
* @msg: 有序消息队列处理
*/
class LiveHandleMQ extends LiveMQ {
private lock = false;// 处理消息过程中加锁,处理结束解锁
private retry: number;// 重试此次
private observer: any;// 观察者
private subscription: any;// 订阅者
public handler = {
set: (target, key, value, receiver) => {
// 队列长度变化时候触发消费数据
if (!this.lock && value > 0 && key == "length") {
this.subscribe();
}
return Reflect.set(target, key, value, receiver);
},
};
constructor(callback: (arg) => void, retry: number = 0) {
super();
// 重试次数合法性校验
if (retry % 1 === 0 && retry >= 0) {
this.callback = callback;
this.retry = retry;
// 使用Proxy 劫持队列数据变化
this.queue = new Proxy([], this.handler);
} else {
console.error("retry is not legitimate");
}
}
private subscribe() {
this.lock = true;
this.observer = window["Rx"].Observable.create(async (observer) => {
try {
await this.callback(this.dequeue());
observer.next("");
observer.complete();
} catch (error) {
console.log("出错了重试");
observer.error(error);
}
}).retry(this.retry);
this.subscription = this.observer.subscribe({
next: () => {
this.next();
},
error: () => {
this.next();
},
});
}
/**
* @name: next
* @msg: 下一步调用
*/
private next() {
// 确认消费
this.confirm();
// 队列中是否还有其他数据需要消费,如果有数据继续消费,如果没有解锁
if (!this.isEmpty()) {
this.subscribe();
} else {
this.lock = false;
}
}
/**
* @name: destroy
* @msg: 清除订阅
*/
destroy() {
if (this.subscription) {
this.subscription.unsubscribe();
}
}
}
复制代码
/**
* @name: LiveCollectionMQ
* @msg: 区间数据采集队列缓存
*/
class LiveCollectionMQ extends LiveMQ {
private emitter = window["mitt"]();// 内部事件
private bufferTime: number;// 采集数据时间区间
private observer: any;
private subscription: any;
private mq: any;// 消息处理者
public handler = {
set: (target, key, value, receiver) => {
// 监听队列中的每个数据变化
if (!isNaN(Number(key))) {
this.emitter.emit("notify", value);
}
return Reflect.set(target, key, value, receiver);
},
};
constructor(callback: (arg) => void, bufferTime: number = 1000) {
super();
if (bufferTime % 1 === 0 && bufferTime > 0) {
const _this = this;
this.mq = new LiveHandleMQ(callback);
this.bufferTime = bufferTime;
this.queue = new Proxy([], this.handler);
// 订阅内部事件数据
this.observer = window["Rx"].Observable.fromEventPattern(
function addHandler(h) {
_this.emitter.on("notify", h);
},
function delHandler(h) {
_this.emitter.off("notify", h);
}
);
this.subscription = this.observer
.bufferTime(_this.bufferTime)
.subscribe((messages) => {
if (messages.length > 0) {
this.mq.enqueue(messages);
}
});
} else {
console.error("bufferTime is not legitimate");
}
}
/**
* @name: destroy
* @msg: 清除订阅
*/
destroy() {
if (this.subscription) {
this.subscription.unsubscribe();
}
this.mq.destroy();
}
}
复制代码
html>
<html lang="en">
<body>
<button type="button" id="xxx">点我发消息button>
<script src="https://unpkg.com/@reactivex/rxjs@5.0.0-beta.1/dist/global/Rx.umd.js">script>
<script src="https://unpkg.com/mitt/dist/mitt.umd.js">script>
<script src="live.js">script>
<script>
// 异步处理函数
function test(mes, observer) {
return new Promise((resolve, reject) => {
let time = Math.ceil(Math.random() * 10000);
console.log("time", time, mes);
setTimeout(() => {
if (false) {
resolve();
} else {
reject();
}
}, time);
});
}
// 单纯的执行函数
function test1(mes) {
console.log(mes);
}
var count = 0;
// var queue = new LiveHandleMQ(test, 3);
// 实例化对象
var queue = new LiveCollectionMQ(test, 10000);
document.getElementById("xxx").addEventListener("click", function () {
count++;
// 数据进入队里
queue.enqueue(count);
if (count > 10) {
//提供声明周期的销毁函数
queue.destroy();
}
});
script>
body>
html>
复制代码