前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何解决node进程间共享内存

如何解决node进程间共享内存

作者头像
用户2436820
发布2020-03-20 12:17:49
2.7K0
发布2020-03-20 12:17:49
举报
文章被收录于专栏:奔跑的蛙牛技术博客

[toc]

npm i @runnersnail/cache-machine

利用rust帮助node进程间共享内存

业务场景:调用算法接口,算法5分钟后得到数据然后调用node接口返回数据,此时node接口接收数据并把数据缓存,用户端访问node无论哪个进程都可以得到被缓存的数据

将解决问题的思路和方法记录下来

遇到问题

由于部署平台对于node程序部署采用pm2(所有进程都是fork出来的,就不可能利用node那一套进程进程间通讯)

也曾经考虑自己实现一套cluster,然后利用master进程通讯。但pm2有其他优秀的功能宕机重启,cpu,内存监控等

分析问题

论坛请教有什么进程间通讯(受限于pm2)的方式,大部分的回答都是直接memcache、redis,感觉为了缓存某一轻量数据就上redis个人感觉没有太大意义,会造成资源浪费和部署麻烦。

解决问题

解决这个问题我们需要了解进程间有哪些通讯方式,才能寻找更好的解决方案。

详细了解请戳

  • 管道pipe:管道是一种半双工的通信方式,数据只能单向流动
  • 消息队列:消息队列是由消息的链表,存放在内核中并由消息队列标识符标识
  • socket: 很常用的方式不再赘述
  • 共享存储SharedMemory: 映射一段可以被不同内存访问的地址块

为何采用shared memory帮助node共享内存

分析我们的业务场景,其实就是某一进程得到数据缓存到内存,然后其他进程可以无视跨进程读取缓存的数据块,说一shared memory是最适合的实用场景

如何使用shared memory 快速解决问题

node本身是不支持shared memeory这种底层操作的,我必须借助底层语言的能力去实现,然后通过ffi调用。为了避免自己实现原剩代码操作内存,我们需要借助一些三方成熟的包 所以我们需要完成以下三个事情

  • 选择一门系统语言
  • 寻找一个成熟的三方包共享内存
  • 寻找ffi工具快速完成
  • 这里系统语言我选择rust,如今前端火热的Deno项目采用rust编写,rust已经变的更靠近web社区
  • 选择Rust的第二个原因是它的三方包类似于npm一样容易集成,挑选shared memory模块 shared_memory-rs进行共享内存
  • 采用成熟的neon进行ffi模块编写

项目实施

使用neon脚手架搭建项目

  • neon new cache-machine ---》 创建项目
  • neon build ---》编译项目
  • node lib/index.js 运行项目

编写rust 模块

代码语言:javascript
复制
extern crate neon;
extern crate shared_memory;
use neon::prelude::*;
use neon::register_module;
use shared_memory::*;

use std::ffi::{CStr, CString};

/**
 * 定义缓存区块
*/
#[derive(SharedMemCast)]
struct ShmemStructCache {
    num_slaves: u32,
    message: [u8; 256],
}

static GLOBAL_LOCK_ID: usize = 0;

/**
 * sharedMemory全局句柄,避免对进程重复创建
*/
static mut SHMEM_GLOBAL: Option<shared_memory::SharedMem> = None;

/**
 * 创建sharedMemory
*/
fn create_open_mem() -> Result<shared_memory::SharedMem, SharedMemError> {
    let shmem = match SharedMem::create_linked("shared_mem.link", LockType::Mutex, 4096) {
        Ok(v) => v,
        Err(SharedMemError::LinkExists) => SharedMem::open_linked("shared_mem.link")?,
        Err(e) => return Err(e),
    };

    if shmem.num_locks() != 1 {
        return Err(SharedMemError::InvalidHeader);
    }
    Ok(shmem)
}

/**
 * 设置SharedMemory
*/
fn set_cache(set_cache: String) -> Result<String, SharedMemError> {
    {
        let mut  shared_state =  unsafe { SHMEM_GLOBAL.as_mut().unwrap().wlock::<ShmemStructCache>(GLOBAL_LOCK_ID)?};
        let set_string: CString = CString::new(set_cache.as_str()).unwrap();
        shared_state.message[0..set_string.to_bytes_with_nul().len()]
            .copy_from_slice(set_string.to_bytes_with_nul());
    }
    Ok("".to_owned())
}

/**
 * 读取SharedMemory
*/
fn get_cache() -> Result<String, SharedMemError> {
    let   result =
    {
        let shmem = unsafe { SHMEM_GLOBAL.as_mut().unwrap()};
        let shared_state = shmem.rlock::<ShmemStructCache>(GLOBAL_LOCK_ID)?;
        let shmem_str: &CStr = unsafe { CStr::from_ptr(shared_state.message.as_ptr() as *mut i8) };
         shmem_str.to_str().unwrap().into()
    };

    Ok(result)
}

/**
 * 暴露给js端get的方法
*/
fn get(mut cx: FunctionContext) -> JsResult<JsString> {
    match get_cache() {
        Ok(v) => Ok(cx.string(v)),
        Err(_) => Ok(cx.string("error")),
    }
}

/**
 * 暴露给js端的set方法
*/
fn set(mut cx: FunctionContext) -> JsResult<JsString> {
    let value = cx.argument::<JsString>(0)?.value();
    match set_cache(value) {
        Ok(v) => Ok(cx.string(v)),
        Err(e) => Ok(cx.string("error")),
    }
}

register_module!(mut m, {
  unsafe {
    SHMEM_GLOBAL = match create_open_mem() {
      Ok(v) => Some(v),
      _ => None,
    };
  }
  set_cache("".to_owned());
  m.export_function("get", get)?;
  m.export_function("set", set)?;
  Ok(())
});

编写js模块包

代码语言:javascript
复制
var addon = require('../native');

/**
 * 
 * @param {缓存的键} key 
 * @param {缓存的值} value 
 */
function set(key, value) {
    let cache = get();
    cache[key] = value;
    addon.set(JSON.stringify(cache));
}

/**
 * 
 * @param {根据键名得到内容} key 
 */
function get(key) {
    const shared_memory = addon.get();
    if (shared_memory) {
        const cache = JSON.parse(cache)
        if (key) {
            return cache[key];
        } else {
            return cache;
        }

    } else {
        return {};
    }
}

module.exports = {
    set,
    get
}

// cache machine

var cache = require("cache-machine");

cache.set('key', 'value');

cache.get('key');

存在问题

rust端对不同进程做了访问控制,没有对线程做控制,考虑到node多线程场景[Worker Threads同时操作某变量]在实际业务中并未发现使用,所以后序增加线程间安全控制

  • 多进程安全的共享内存
  • 多线程安全的共享内存 TODO
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 利用rust帮助node进程间共享内存
    • 遇到问题
      • 分析问题
        • 解决问题
        • 为何采用shared memory帮助node共享内存
        • 如何使用shared memory 快速解决问题
      • 项目实施
        • 使用neon脚手架搭建项目
        • 编写rust 模块
        • 编写js模块包
      • 存在问题
      相关产品与服务
      消息队列 CMQ 版
      消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档