https://mp.weixin.qq.com/s/RsvI5AFzbp3rm6sOlTmiYQ
3700G
免费学习资料、或者加入技术交流群(禁止发广告),可以文末+我微信,专注技术不闲聊08 65 12 06 48 65 6C 6C 6F 77
31 30 31 3C 2F 69 64 3E 3C 6E 61 6D 65 3E 68 65
6C 6C 6F 3C 2F 6E 61 6D 65 3E 3C 2F 68 65 6C 6C
6F 77 6F 72 6C 64 3E
yarn add protobufjs -D
mkdir proto
cd proto
vi message.proto
....
//message.proto文件
package message;
option optimize_for = LITE_RUNTIME;
message Account{
required string accountName = 1;
required string pwd = 2;
}
message AccountList{
required int32 index = 1;
repeated Account list = 2;
}
protobufjs
const ProtoBufJs = require("protobufjs");
const root = ProtoBufJs.loadSync("./proto/message.proto");
const ProtoBufJs = require("protobufjs");
const root = ProtoBufJs.loadSync("./proto/message.proto");
const AccountList = root.lookupType("message.AccountList");
const Account = root.lookupType("message.Account");
const accountListObj = AccountList.create();
for (let i = 0; i < 5;i++) {
const accountObj = Account.create();
accountObj.accountName = "前端巅峰" + i;
accountObj.pwd = "Peter酱要比技术胖还骚" + i;
accountListObj.list.push(accountObj);
}
const buffer = AccountList.encode(accountListObj).finish();
console.log(buffer)
const ProtoBufJs = require("protobufjs");
const root = ProtoBufJs.loadSync("./proto/message.proto");
const AccountList = root.lookupType("message.AccountList");
const Account = root.lookupType("message.Account");
const accountListObj = AccountList.create();
const accountObj = Account.create();
accountObj.accountName = "前端巅峰";
accountObj.pwd = "Peter酱要比技术胖还骚";
accountObj.test = "大保健越做身体越虚是为什么";
accountListObj.list.push(accountObj);
const buffer = AccountList.encode(accountListObj).finish();
console.log(buffer.toString());
socket
通信,由于是实现redis
,这里不使用udp
通讯,而是基于可靠的TCP
,先编写redis
服务端代码const net = require("net");
const listenPort = 6380; //监听端口
const server = net
.createServer(function (socket) {
// 创建socket服务端
console.log("connect: " + socket.remoteAddress + ":" + socket.remotePort);
socket.setKeepAlive(true);
socket.setEncoding("utf-8");
//接收到数据
socket.on("data", function (data) {
console.log("client send:" + data);
});
socket.write("Hello client!\r\n");
//数据错误事件
socket.on("error", function (exception) {
socket.end();
});
//客户端关闭事件
socket.on("close", function (data) {
});
})
.listen(listenPort);
//服务器监听事件
server.on("listening", function () {
console.log("server listening:" + server.address().port);
});
//服务器错误事件
server.on("error", function (exception) {
console.log("server error:" + exception);
});
redis
默认端口6379
,我们监听在6380
端口,心跳保活,应用层做keep-alive
,socket.setKeepAlive(true)
,保持长链接Socket
通信const { Socket } = require("net");
//其他引入pb文件的代码不变
pb
文件的代码不变,客户端一份,服务端一份,双工通讯两边pb
文件都要各自有一份const port = 6380;
const host = "127.0.0.1";
const client = new Socket();
client.setKeepAlive(true);
client.setEncoding("utf-8");
//连接到服务端
client.connect(port, host, function () {
client.write("hello server");
//向端口写入数据到达服务端
});
client.on("data", function (data) {
console.log("from server:" + data);
//得到服务端返回来的数据
});
client.on("error", function (error) {
//错误出现之后关闭连接
console.log("error:" + error);
client.destory();
});
client.on("close", function () {
//正常关闭连接
console.log("Connection closed");
});
socket
链接6380
端口服务器,建立长链接package message;
syntax = "proto3";
message PingPong {
string message_type = 1; // 会变成 messageType
string ping = 2;
string pong = 3;
}
const root = ProtoBufJs.loadSync('./proto/message.proto');
const PingPong = root.lookupType('message.PingPong');
setInterval(() => {
const payload = { message_type: '1', ping: '1', pong: '2' };
const errMsg = PingPong.verify(payload);
if (errMsg) throw Error(errMsg);
const message = PingPong.create(payload);
const buffer = PingPong.encode(message).finish();
client.write(buffer);
}, 3000);
const root = ProtoBufJs.loadSync('./proto/message.proto');
const PingPong = root.lookupType('message.PingPong');
const server = createServer(function (socket) {
socket.setKeepAlive(true);
// 创建socket服务端
//接收到数据
socket.on('data', function (data) {
const decodedMessage = PingPong.decode(data);
console.log(decodedMessage, 'decodedMessage');
});
socket.write('Hello client!\r\n');
//数据错误事件
socket.on('error', function (exception) {
console.log('socket error:' + exception);
socket.end();
});
//客户端关闭事件
socket.on('close', function (data) {
console.log('client closed!');
});
}).listen(listenPort);
timer = setInterval(() => {
count++;
const payload = { messageType: '1', ping: '1' };
const errMsg = PingPong.verify(payload);
if (errMsg) throw Error(errMsg);
const message = PingPong.create(payload);
const buffer = PingPong.encode(message).finish();
client.write(buffer);
}, 3000);
socket.on('data', function (data) {
const decodedMessage = PingPong.decode(data);
console.log(decodedMessage,'decodedMessage')
if(decodedMessage.messageType ==='1'){
console.log('进入判断')
const payload = { messageType: '1', pong: '1'};
const errMsg = PingPong.verify(payload);
if (errMsg) throw Error(errMsg);
const message = PingPong.create(payload);
const buffer = PingPong.encode(message).finish();
socket.write(buffer);
}
});
client.on('data', function (data) {
const decodedMessage = PingPong.decode(data);
if (decodedMessage.messageType === '1') {
console.log('收到心跳回包');
count = 0;
}
console.log('from server:' + decodedMessage.messageType);
//得到服务端返回来的数据
});
timer = setInterval(() => {
if (count > 3) {
clearInterval(timer);
client.end();
throw Error('timeout')
}
count++;
const payload = { messageType: '1', ping: '1' };
const errMsg = PingPong.verify(payload);
if (errMsg) throw Error(errMsg);
const message = PingPong.create(payload);
const buffer = PingPong.encode(message).finish();
client.write(buffer);
}, 3000);
socket.write(buffer);
socket
链接Map
类型存储PB
协议ACK
Payload
pb字段message Data {
string message_type = 1; // 会变成 messageType
Payload data = 2;
}
message Payload {
required string key = 1;
required string value =2;
}
const Data = root.lookupType('message.Data');
function RedisSet() {
const msg = { messageType: '2', data: { key: '1', value: '2' } };
const errMsg = Data.verify(msg);
if (errMsg) throw Error(errMsg);
const message = Data.create(msg);
const buffer = Data.encode(message).finish();
client.write(buffer);
}
socket.on('data', function (data) {
const decodedMessage = PingPong.decode(data);
console.log(decodedMessage,'decodedMessage');
if(decodedMessage.messageType ==='1'){
const payload = { messageType: '1', pong: '1'};
const errMsg = PingPong.verify(payload);
if (errMsg) throw Error(errMsg);
const message = PingPong.create(payload);
const buffer = PingPong.encode(message).finish();
socket.write(buffer);
}
});
message Common {
string message_type = 1;
}
common
,得到messageType后再进行处理,再反序列化一次 socket.on('data', function (data) {
const res = Common.decode(data);
if (res.messageType=== '1') {
const payload = { messageType: '1', pong: '1' };
const errMsg = PingPong.verify(payload);
if (errMsg) throw Error(errMsg);
const message = PingPong.create(payload);
const buffer = PingPong.encode(message).finish();
socket.write(buffer);
} else if(res.messageType=== '2'){
const message = Data.decode(data)
const payload = message.data;
console.log(payload.key,'payload');
M.set(payload.key,payload.value);
console.log(M,'m')
}
});
RedisGet
方法:const M = new Map();
M.set('1','Peter酱牛逼')
function RedisGet() {
const msg = { messageType: '3', data: { key: '1' } };
const errMsg = Data.verify(msg);
if (errMsg) throw Error(errMsg);
const message = Data.create(msg);
const buffer = Data.encode(message).finish();
client.write(buffer);
}
else if (res.messageType === '3') {
const message = Query.decode(data);
const res = M.get(message.key);
console.log(res, 'res');
}
promise | generator|async
)const cbQueue = []; //回调队列
const setQueue = []; //set的队列
const getQueue = []; //get的队列
cb
,改造redisSetfunction RedisSet(data, cb) {
cbQueue.push(cb);
setQueue.push(data);
console.log(cbQueue, setQueue, "queue");
const errMsg = Data.verify(data);
if (errMsg) throw Error(errMsg);
const message = Data.create(data);
const buffer = Data.encode(message).finish();
client.write(buffer);
}
else if (res.messageType === '2') {
const message = Data.decode(data);
const payload = message.data;
M.set(payload.key, payload.value);
}
message setSucceed {
string message_type = 1;
}
const msg = { messageType: "5" };
const errMsg = setSucceed.verify(msg);
if (errMsg) throw Error(errMsg);
const m = setSucceed.create(msg);
const buffer = setSucceed.encode(m).finish();
socket.write(buffer);
RedisSet(data, () => {
console.log("set成功,触发cb");
});
else if (decodedMessage.messageType === "5") {
const cb = cbQueue.shift();
cb && cb();
}
ACK
,触发cbQueue中的cb(此时将cbQueue数组类型改成Map,方便处理),触发完成后remove掉cb
即可yarn add node-uuid
const uuid = require('node-uuid');
// v1 根据时间戳和随机数生成的uuid
const creatuuid= uuid.v1()
message Data {
string message_type = 1; // 会变成 messageType
string uuid = 2;
Payload data = 3;
}
function RedisSet(data, cb) {
// v1 根据时间戳和随机数生成的uuid
const creatuuid = uuid.v1();
data.uuid = creatuuid;
cbQueue.set(creatuuid, cb);
const errMsg = Data.verify(data);
if (errMsg) throw Error(errMsg);
const message = Data.create(data);
const buffer = Data.encode(message).finish();
client.write(buffer);
}
else if (res.messageType === '2') {
const message = Data.decode(data);
const payload = message.data;
M.set(payload.key, payload.value);
const msg = { messageType: '5', uuid: message.uuid };
const errMsg = setSucceed.verify(msg);
if (errMsg) throw Error(errMsg);
const m = setSucceed.create(msg);
const buffer = setSucceed.encode(m).finish();
socket.write(buffer);
}
cb
else if (decodedMessage.messageType === '5') {
const res = setSucceed.decode(data);
const cb = cbQueue.get(res.uuid);
cb && cb() && cbQueue.remove(res.uuid);
}
message Query {
string message_type = 1;
string key = 2;
string uuid =3;
}
get = function (key, cb) {
// v1 根据时间戳和随机数生成的uuid
const creatuuid = uuid.v1();
getCbQueue.set(creatuuid, cb);
const msg = { messageType: '6', key, uuid: creatuuid };
const errMsg = Query.verify(msg);
if (errMsg) throw Error(errMsg);
const message = Query.create(msg);
const buffer = Query.encode(message).finish();
TCPClient.write(buffer);
};
else if (res.messageType === "6") {
const message = Query.decode(data);
const res = M.get(message.key);
const msg = { messageType: "6", uuid: message.uuid, data: res };
const errMsg = getSucceed.verify(msg);
if (errMsg) throw Error(errMsg);
const m = getSucceed.create(msg);
const buffer = getSucceed.encode(m).finish();
socket.write(buffer);
}
else if (decodedMessage.messageType === '6') {
const res = getSucceed.decode(data);
const cb = getCbQueue.get(res.uuid);
cb && cb(res.data);
getCbQueue.delete(res.uuid);
}
const Redis = require('./redis');
const port = 6380;
const host = '127.0.0.1';
const RedisStore = Redis.connect(port, host);
const data = { messageType: '2', data: { key: '1', value: '2' } };
RedisStore.set(data, () => {
console.log('set成功,触发cb');
});
RedisStore.get('1', (data) => {
console.log('get成功data:', data);
});
cluster
源码解析,用pm2 docker谁都会,但是真的要自己实现,还是要思考一下https://segmentfault.com/a/1190000021230376
pm2 start server.js
M.set(payload.key, payload.value);
fs.appendFile(
'./redis.db',
`${payload.key},${payload.value}\n`,
(error) => {
if (error) return console.log('追加文件失败' + error.message);
console.log('追加成功');
}
);
Unicode: U+200B HTML: ​
Unicode: U+200C HTML: ‌
Unicode: U+200D HTML: ‍
Unicode: U+200E HTML: ‎ ‎ 或‎
Unicode: U+200F HTML: ‏ ‏ 或‏
Unicode: U+FEFF
\u200b\
,因为它够2b `${payload.key},${payload.value}\u200b\n`,
//服务器监听事件
server.on('listening', function () {
fs.readFile('./redis.db', (err, data) => {
console.log(data.toString(), 'xxx');
});
console.log('server listening:' + server.address().port);
});
`${payload.key}-${payload.value}\u200b`,
//服务器监听事件
server.on('listening', function () {
fs.readFile('./redis.db', (err, data) => {
const string = data.toString();
if (string.length > 0) {
const result = string.split('\u200b');
for (let i = 0; i < result.length; i++) {
const res = result[i];
for (let j = 0; j < res.length; j++) {
if (res[j] === '-') {
continue;
}
j === 0 ? M.set(res[j], null) : M.set(res[j - 2], res[j]);
}
}
}
});
console.log('server listening:' + server.address().port);
});