专栏首页Ceph对象存储方案基于RGW的多媒体处理框架

基于RGW的多媒体处理框架

基于RGW的多媒体处理框架

背景

业务需要在原有RGW的服务基础上加上对多媒体类资源的处理,比如图片resize、视频转码等。参考过多个厂家的设计,发现对方的多媒体类处理都是在URL里面加上query string来实现,比如: 裁剪正中部分,等比缩小生成200x200缩略图,对应的URL如下: http://odum9helk.qnssl.com/resource/gogopher.jpg?imageView2/1/w/200/h/200

图片操作的配置参数都是通过增加Query String=imageView2/1/w/200/h/200

问题

  1. 用户体验:这种设计咋看起来很直观,但是一旦QueryString字段多了或者整个字符串过长以后,肉眼看起来非常累,特别是排错调参的时候,你会觉得自己像小学生数着手指头算数。
  2. 代码开发:引入这种设计,在RGW中整个请求路由层写起来会要一堆正则,很容易破坏现有的代码结构。(个人愚见)
  3. 版本维护:随着各种新增加功能模块的不断引入,RGW的版本维护会变得比较痛苦。同时要知道原生RGW现在没有平滑升级方案,必须重启。(当然有其他办法解决)

思路:

  1. 通过复用S3请求中的用户自定义元数据字段(header中x-amz-meta-开头)来加载多媒体处理任务信息,从而绕开需要单独设计Restful API及完成相关签名认证部分的代码设计,同时做到一个数据上传请求同时包含了多媒体处理任务的下发。(异步场景)
  2. 复用原生S3 Response Header中的x-amz-request-id来实现任务ID的生成(全局唯一),实现基于ID的任务的跟踪与管理。(异步场景)
  3. 异步任务完成以后,转码后的文件会按指定路径存放,如图中的"tmp/new-img.jpg"。
  4. 在整个服务入口网关处加上对业务类型的逻辑判断,同步类型的请求在本地操作(减少后端RGW压力,同时减少IO路径),异步请求发后台服务器。这样做还有一个好处是方便后期扩展,同步服务能力不足就新增入口节点,异步处理能力不足就新增后台服务器。

需求描述

  1. 客户端在尽量少改动现有接口API的情况下完成图片、视频等多种类型多媒体文件的转码一类处理。
  2. 满足业务数据上传和转码操作在同一次Request请求中提交,减少请求次数。
  3. 针对后台业务的处理能力及时长不同,需要同时支持同步(小图片)及异步(大视频)两种类型的任务下发操作。

构架介绍

  • Client:请求发起端,提交数据及转码相关任务到服务端。
  • Openresty:服务网关,接受客户端的请求,根据请求的类型(同步or异步)来进行不同的处理。其中http_image_filter_module用于演示图片resize的同步操作,log_by_lua_file用于向后端发送异步任务到kafka。
  • RGW:提供基础S3对象存储服务。
  • Kafka:负责日志消息存储及转发。
  • Job Server:多媒体处理服务,负责从kafka中取任务,之后将任务状态更新到DB中。
  • DB:记录任务状态数据,供前台服务查询。

任务操作流程-异步方式

  1. 任务提交阶段
  • Step1. 客户端使用普通的Put object请求,只需要加上自定义的metadata字段即可完成数据及任务的提交。
  • Step2. Openresty使用proxy模式将请求转发到RGW,由RGW完成后台数据存储处理。
  • Step3. Openresty在RGW完成数据存储以后,调用log_by_lua_file将对应请求的用户自定义metadata及x-amz-request-id转发到后台kafka。
  • Setp4. Job Server从kafka中拿到新增的任务及相应ID。
  • Setp5. Job Server从RGW下载对应的Object数据到本地,执行相应的多媒体处理操作,并将结果更新到DB
  1. 任务查询阶段 Step1. 客户端使用之前记录下来的x-amz-request-id向服务器发起任务查询请求(这里需要单独做一个服务查询接口)。 Step2. Openresty接收到客户端查询请求以后从DB中查询相应的任务状态数据并返回给客户端。

前端逻辑

  • Step1. Job Server将处理完成的任务数据,按照用户指定的对象名称写回RGW。
  • Step2. Job Server完成RGW的数据写入以后,更新相应的任务状态到DB。

后端逻辑

  • Step1. 客户端使用之前记录下来的x-amz-request-id向服务器发起任务查询请求(这里需要单独做一个服务查询接口)。
  • Step2. Openresty接收到客户端查询请求以后从DB中查询相应的任务状态数据并返回给客户端。

3. 获取任务结果

  • Step1. 客户端在上一步骤中查询到对应的任务已经完成的情况下,直接通过Get Object即可取回转码完成后的数据。

任务操作流程-同步方式

  1. 准备阶段
    • Step1. 客户端需要提前上传好Object,之后设置对应的ACL=public-read。(设置public-read是为了演示方便,如果要鉴权则需要修改nginx的module逻辑)
  2. 使用阶段
  • Step1. 客户端在需要获取的Object的URL后面加上对应的height和width参数等。
  • Step2. Openresty拿到后台RGW的数据以后,将数据加载到内存之后经过http_image_filter_module模块处理,完成内存中临时数据的生成,并将内存中的临时数据返回给用户。

Demo演示

Demo实现以下功能: 同步请求:借助已有的http_image_filter_module模块实现已经存储在RGW的图片(Acl=Public-read)的在线resize操作。 异步请求:过滤Put Object成功的请求,并将其中的用户自定义metadata及x-amz-request-id发送到后端kafka。(kafka之后的Job Server相关需要自己实现,这里省略)

1. 环境配置说明

使用的是openresty/1.15.8.2,需要编译的时候需要加上"--with-http_image_filter_module"来开启http_image_filter_module模块。

/usr/local/openresty/nginx/conf/nginx.conf 配置如下

    server {
        listen       80;
        server_name  test.s3.c.local *.s3.jrss.c.local;
        client_max_body_size 20m;
    location ~ "^(/.*/.*\.(jpg|png|jpeg))!c(\d+)-(\d+)$" {
        set $w $3;
        set $h $4;
        rewrite ^(/.*/.*\.(jpg|png|jpeg))!c(\d+)-(\d+)$ $1 break;
        image_filter crop $w $h; #裁剪图片
        image_filter_buffer 20M;
        proxy_pass http://172.25.60.215:9000;
    }
    location ~ "^(/.*/.*\.(jpg|png|jpeg))!r(\d+)-(\d+)$" {
        set $w $3;
        set $h $4;
        rewrite ^(/.*/.*\.(jpg|png|jpeg))!r(\d+)-(\d+)$ $1 break;
        image_filter resize $w $h; #缩略图
        image_filter_buffer 20M;
        proxy_pass http://172.25.60.215:9000;
    }
    location ~ "^(/.*/.*\.(jpg|png|jpeg))!d(\d+)$" {
        set $d $3;
        rewrite ^(/.*/.*\.(jpg|png|jpeg))!d(\d+)$ $1 break;
        image_filter rotate $d; #翻转图片
        image_filter_buffer 20M;
        proxy_pass http://172.25.60.215:9000;
    }
        location / {
            proxy_http_version 1.1;
            proxy_set_header Connection "";
            proxy_set_header HTTP_PROXY "";
            proxy_set_header Proxy "";
            proxy_set_header Host $host;
            proxy_set_header X-Forwarded-For $remote_addr;
            proxy_pass http://172.25.60.215:9000;
        log_by_lua_file /usr/local/openresty/nginx/conf/kafka.lua;
        }

其中/usr/local/openresty/nginx/conf/kafka.lua用来将用户自定义的metadata及任务ID转发到kafka。

local cjson = require "cjson"
local producer = require "resty.kafka.producer"
local broker_list = {
    { host = "localhost", port = 9092 },
}
function send_job_to_kafka()
    local req_header = ""
    local udf_meta_reg = "^x-amz-meta-"
    local headers_ = ngx.req.get_headers()
    for k, v in pairs(headers_) do
        local meta_check, meta_err = ngx.re.match(k,udf_meta_reg)
        if meta_check then
         req_header = req_header .. k.."="..v.." "
        end
    end
    local log_json = {}
    log_json["uri"]=ngx.var.uri
    log_json["host"]=ngx.var.host
    log_json["remote_addr"] = ngx.var.remote_addr
    log_json["status"] = ngx.var.status
    log_json["request_method"] = ngx.var.request_method
    log_json["req_header"] = req_header
    log_json["http_x_amz_request_id"] = ngx.var.upstream_http_x_amz_request_id
    local message = cjson.encode(log_json);
    return message
end

local is_args = ngx.var.is_args
local request_method = ngx.var.request_method
local status_code = ngx.var.status

-- 过滤Put Object成功的请求,记录相应的metadata及请求ID,并转发到kafka
if request_method == "PUT" and status_code == "200" and is_args == "" then
    local bp = producer:new(broker_list, { producer_type = "async" })
    local ok, err = bp:send("test", "key", send_job_to_kafka())
    if not ok then
        ngx.log(ngx.ERR, "kafka send err:", err)
        return
    end
    ngx.log(ngx.ERR, "kafka send sucessful:", ok)
end

2. 测试用例

import boto3
from botocore.client import Config

aws_access_key_id = '' #AK
aws_secret_access_key = '' #SK
endpoint = 'http://test.s3.c.local'
bucket = 'demo' #bucket名称
objcet = '1.jpg' #对象名称
file_path = '/Desktop/op.jpg' #本地图片文件路径
# aws4
s3 = boto3.client('s3', region_name='cn-hb-pri1',
                  use_ssl=False,
                  endpoint_url=endpoint,
                  aws_access_key_id=aws_access_key_id,
                  aws_secret_access_key=aws_secret_access_key,
                  config=Config(signature_version='s3v4',
                                s3={'addressing_style': 'path'}))


with open(file_path, 'r') as f:
    response = s3.put_object(
        ACL='public-read',
        Body=f,
        Bucket=bucket,
        Key=objcet,
        ContentType='image/jpeg',
        Metadata={
            'save-path': 'tmp/new-img.jpg',
            'width': '1920',
            'heigh': '1200',
        },
    )

print "{}/{}/{}".format(endpoint,bucket,objcet) #默认分辨率图片地址
print "{}/{}/{}!r50-50".format(endpoint,bucket,objcet) #生成50x50分辨率缩略图
print "{}/{}/{}!r100-100".format(endpoint,bucket,objcet) #裁剪图片
print "{}/{}/{}!d90".format(endpoint,bucket,objcet) #图片旋转90度

抓包可以看到整个请求头部的内容如下:

上面脚本跑完会生成2个URL,使用对应的URL就可以对比看到对应的图片resize等效果。

通过运行下面的KafkaConsumer脚本,可以看到异步任务对应的kafka消息数据也发送成功

from kafka import KafkaConsumer

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('test',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

结果截图

本文分享自微信公众号 - Ceph对象存储方案(cephbook),作者:秦牧羊

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-10-18

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 对象存储服务-构架设计

    对象存储服务构架设计 基本构架组成 ? 1.网关服务(Gateway): 客户端发送请求(Request)到网关服务(Gateway)再由网关服务实现将客户端请...

    用户1260683
  • 实弹军演-基于Ceph对象存储的实战兵法

    1 知己知彼,百战不殆 剖析业务IO模型 了解业务基本存储模型: 最高并发多少,最高读写带宽需求。 并发多少决定了在知道单个RGW最大并发数上限的前提下你需要用...

    用户1260683
  • RGW奇淫技巧-玩转system特权

    开启system特权 root@demohost:/home/user# radosgw-admin user modify --system=1 --uid=...

    用户1260683
  • 番外:同步异步阻塞非阻塞,了解一下?(十三节)

    今天这一节严格意义上其实不能算一个章节而应该是一个番外篇。因为通过前面翔实而又丰富的内容中,我认为大家已经具备了可以理解[ 同步、异步、阻塞、非阻塞 ]的条件了...

    老李秀
  • Hygieia-你值得拥有!!!(下篇)

    书接上篇,我们介绍了Hygieia的架构图、应用的的技术、以及主要工程的搭建步骤,现在Hygieia系统已经能够完整的运行起来了,但是如果要充分发挥Hygiei...

    用户5521279
  • 浅析为何能通过FDStackView在iOS9以下使用UIStackView

    前几天看到sunnyxx团队的新作FDStackView。大家都知道在iOS9苹果提供了一个新的玩具UIStackView,然而在iOS9以前是没有办法使用的。...

    100000798482
  • 推荐一个c++小巧开源且跨平台的图像解码库

    该图像解码库仅仅三个文件。 图像处理封装: spot.cpp spot.h 解码库实现: spot.c  支持图片文件格式如下: File formatRe...

    cpuimage
  • Spark基础-scala学习(三、Trait)

    老梁
  • CentOS7下安装PostgreSQL12

    PostgreSQL是一个功能强大的开源数据库系统。经过长达15年以上的积极开发和不断改进,PostgreSQL已在可靠性、稳定性、数据一致性等获得了业内极高的...

    yuanfan2012
  • 从“陪练”到“赢家”:人机博弈的六十年

    大数据文摘

扫码关注云+社区

领取腾讯云代金券

玩转腾讯云 有奖征文活动