Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >消息中间件—RocketMQ的RPC通信(一)

消息中间件—RocketMQ的RPC通信(一)

作者头像
用户2991389
发布于 2018-09-05 04:43:07
发布于 2018-09-05 04:43:07
1.5K00
代码可运行
举报
运行总次数:0
代码可运行

文章摘要:借用小厮的一句话“消息队列的本质在于消息的发送、存储和接收”。那么,对于一款消息队列来说,如何做到消息的高效发送与接收是重点和关键

一、RocketMQ中Remoting通信模块概览

RocketMQ消息队列的整体部署架构如下图所示:

RocketMQ整体的架构集群图.jpg

先来说下RocketMQ消息队列集群中的几个角色: (1)NameServer:在MQ集群中做的是做命名服务,更新和路由发现 broker服务; (2)Broker-Master:broker 消息主机服务器(3)Broker-Slave:broker 消息从机服务器; (4)Producer:消息生产者; (5)Consumer:消息消费者;

其中,RocketMQ集群的一部分通信如下: (1)Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定期向NameServer上报Topic路由信息; (2)消息生产者Producer作为客户端发送消息时候,需要根据Msg的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取; (3)消息生产者Producer根据(2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者收消息并落盘存储; 从上面(1)~(3)中可以看出在消息生产者, Broker和NameServer之间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终的性能。 rocketmq-remoting 模块是 RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-client、rocketmq-server、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块。 ps:鉴于RocketMQ的通信模块是建立在Netty基础之上的,因此在阅读RocketMQ的源码之前,读者最好先对Netty的多线程模型、JAVA NIO模型均有一定的了解,这样子理解RocketMQ源码会较为快一些。 作者阅读的RocketMQ版本是4.2.0, 依赖的netty版本是4.0.42.Final. RocketMQ的代码结构图如下:

RocketMQ的Remoting源代码目录结构.png

源码部分主要可以分为rocketmq-broker,rocketmq-client,rocketmq-common,rocketmq-filterSrv,rocketmq-namesrv和rocketmq-remoting等模块,通信框架就封装在rocketmq-remoting模块中。 本文主要从RocketMQ的协议格式,消息编解码,通信方式(同步/异步/单向)、通信流程和Remoting模块的Netty多线程处理架构等方面介绍RocketMQ的通信模块。

二、RocketMQ中Remoting通信模块的具体实现

1、Remoting通信模块的类结构图

RocketMQ的Remoting模块类结构图.png

从类层次结构来看: (1)RemotingService:为最上层的接口,提供了三个方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);

(2)RemotingClient/RemotingSever:两个接口继承了最上层接口—RemotingService,分别各自为Client和Server提供所必需的方法,下面所列的是RemotingServer的方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
     * 同RemotingClient端一样
     *
     * @param requestCode
     * @param processor
     * @param executor
     */
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);

    /**
     * 注册默认的处理器
     *
     * @param processor
     * @param executor
     */
    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);

    int localListenPort();

    /**
     * 根据请求code来获取不同的处理Pair
     *
     * @param requestCode
     * @return
     */
    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);

    /**
     * 同RemotingClient端一样,同步通信,有返回RemotingCommand
     * @param channel
     * @param request
     * @param timeoutMillis
     * @return
     * @throws InterruptedException
     * @throws RemotingSendRequestException
     * @throws RemotingTimeoutException
     */
    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
        RemotingTimeoutException;

    /**
     * 同RemotingClient端一样,异步通信,无返回RemotingCommand
     *
     * @param channel
     * @param request
     * @param timeoutMillis
     * @param invokeCallback
     * @throws InterruptedException
     * @throws RemotingTooMuchRequestException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */
    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

    /**
     * 同RemotingClient端一样,单向通信,诸如心跳包
     *
     * @param channel
     * @param request
     * @param timeoutMillis
     * @throws InterruptedException
     * @throws RemotingTooMuchRequestException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */
    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException;

(3)NettyRemotingAbstract:Netty通信处理的抽象类,定义并封装了Netty处理的公共处理方法; (4)NettyRemotingClient/NettyRemotingServer:分别实现了RemotingClient和RemotingServer, 都继承了NettyRemotingAbstract抽象类。RocketMQ中其他的组件(如client、nameServer、broker在进行消息的发送和接收时均使用这两个组件)

2、消息的协议设计与编码解码

在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。 RemotingCommand类的部分成员变量如下:

Header字段

类型

Request说明

Response说明

code

int

请求操作码,应答方根据不同的请求码进行不同的业务处理

应答响应码。0表示成功,非0则表示各种错误

language

LanguageCode

请求方实现的语言

应答方实现的语言

version

int

请求方程序的版本

应答方程序的版本

opaque

int

相当于reqeustId,在同一个连接上的不同请求标识码,与响应消息中的相对应

应答不做修改直接返回

flag

int

区分是普通RPC还是onewayRPC得标志

区分是普通RPC还是onewayRPC得标志

remark

String

传输自定义文本信息

传输自定义文本信息

extFields

HashMap<String, String>

请求自定义扩展信息

响应自定义扩展信息

这里展示下Broker向NameServer发送一次心跳注册的报文:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
[
code=103,//这里的103对应的code就是broker向nameserver注册自己的消息
language=JAVA,
version=137,
opaque=58,//这个就是requestId
flag(B)=0,
remark=null,
extFields={
    brokerId=0,
    clusterName=DefaultCluster,
    brokerAddr=ip1: 10911,
    haServerAddr=ip1: 10912,
    brokerName=LAPTOP-SMF2CKDN
},
serializeTypeCurrentRPC=JSON

下面来看下RocketMQ通信协议的格式:

RocketMQ中Remoting协议格式.png

可见传输内容主要可以分为以下4部分: (1)消息长度:总长度,四个字节存储,占用一个int类型; (2)序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度; (3)消息头数据:经过序列化后的消息头数据; (4)消息主体数据:消息主体的二进制字节数据内容; 消息的编码和解码分别在RemotingCommand类的encode和decode方法中完成,下面是消息编码encode方法的具体实现:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public ByteBuffer encode() {
    // 1> header length size
    int length = 4;    //消息总长度

    // 2> header data length
    //将消息头编码成byte[]
    byte[] headerData = this.headerEncode(); 
    //计算头部长度 
    length += headerData.length;              

    // 3> body data length
    if (this.body != null) {
        //消息主体长度
        length += body.length;                
    }
    //分配ByteBuffer, 这边加了4, 
    //这是因为在消息总长度的计算中没有将存储头部长度的4个字节计算在内
    ByteBuffer result = ByteBuffer.allocate(4 + length);  

    // length
    //将消息总长度放入ByteBuffer
    result.putInt(length);   

    // header length
    //将消息头长度放入ByteBuffer
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); 

    // header data
    //将消息头数据放入ByteBuffer
    result.put(headerData);    

    // body data;
    if (this.body != null) {
        //将消息主体放入ByteBuffer
        result.put(this.body); 
    }
    //重置ByteBuffer的position位置
    result.flip();     

    return result;
}

    /**
     * markProtocolType方法是将RPC类型和headerData长度编码放到一个byte[4]数组中
     *
     * @param source
     * @param type
     * @return
     */
    public static byte[] markProtocolType(int source, SerializeType type) {
        byte[] result = new byte[4];

        result[0] = type.getCode();
        //右移16位后再和255与->“16-24位”
        result[1] = (byte) ((source >> 16) & 0xFF);
        //右移8位后再和255与->“8-16位”
        result[2] = (byte) ((source >> 8) & 0xFF);
        //右移0位后再和255与->“8-0位”
        result[3] = (byte) (source & 0xFF);
        return result;
    }

消息解码decode方法是编码的逆向过程,其具体实现如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        //获取byteBuffer的总长度
        int length = byteBuffer.limit();

        //获取前4个字节,组装int类型,该长度为总长度
        int oriHeaderLen = byteBuffer.getInt();

        //获取消息头的长度,这里和0xFFFFFF做与运算,编码时候的长度即为24位
        int headerLength = getHeaderLength(oriHeaderLen);

        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);

        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;

        return cmd;
    }

3、消息的通信方式和通信流程

在RocketMQ消息队列中支持通信的方式主要有以下三种: (1)同步(sync) (2)异步(async) (3)单向(oneway) 其中“同步”通信模式相对简单,一般用在发送心跳包场景下,无需关注其Response。本文将主要介绍RocketMQ的异步通信流程(限于篇幅,读者可以按照同样的模式进行分析同步通信流程)。 下面先给出了RocketMQ异步通信的整体流程图:

RocketMQ异步通信的整体时序图.png

下面两小节内容主要介绍了Client端发送请求消息和Server端接收消息的具体实现,其中对于Client端的回调可以参考RocketMQ的源码来分析这里就不做详细介绍。

3.1、Client发送请求消息的具体实现

当客户端调用异步通信接口—invokeAsync时候,先由RemotingClient的实现类—NettyRemotingClient根据addr获取相应的channel(如果本地缓存中没有则创建),随后调用invokeAsyncImpl方法,将数据流转给抽象类NettyRemotingAbstract处理(真正做完发送请求动作的是在NettyRemotingAbstract抽象类的invokeAsyncImpl方法里面)。具体发送请求消息的源代码如下所示:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
/**
     * invokeAsync(异步调用)
     * 
     * @param channel
     * @param request
     * @param timeoutMillis
     * @param invokeCallback
     * @throws InterruptedException
     * @throws RemotingTooMuchRequestException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */
    public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        //相当于request ID, RemotingCommand会为每一个request产生一个request ID, 从0开始, 每次加1

        final int opaque = request.getOpaque();
        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
            //根据request ID构建ResponseFuture
            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
            //将ResponseFuture放入responseTable
            this.responseTable.put(opaque, responseFuture);
            try {
                //使用Netty的channel发送请求数据
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    //消息发送后执行
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                            //如果发送消息成功给Server,那么这里直接Set后return
                            responseFuture.setSendRequestOK(true);
                            return;
                        } else {
                            responseFuture.setSendRequestOK(false);
                        }

                        responseFuture.putResponse(null);
                        responseTable.remove(opaque);
                        try {
                            //执行回调
                            executeInvokeCallback(responseFuture);
                        } catch (Throwable e) {
                            log.warn("excute callback in writeAndFlush addListener, and callback throw", e);
                        } finally {
                            //释放信号量
                            responseFuture.release();
                        }

                        log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                    }
                });
            } catch (Exception e) {
                //异常处理
                responseFuture.release();
                log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
            } else {
                String info =
                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                        timeoutMillis,
                        this.semaphoreAsync.getQueueLength(),
                        this.semaphoreAsync.availablePermits()
                    );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }

在Client端发送请求消息时有个比较重要的数据结构需要注意下: (1)responseTable—保存请求码与响应关联映射

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable 

opaque表示请求发起方在同个连接上不同的请求标识代码,每次发送一个消息的时候,可以选择同步阻塞/异步非阻塞的方式。无论是哪种通信方式,都会保存请求操作码至ResponseFuture的Map映射—responseTable中。 (2)ResponseFuture—保存返回响应(包括回调执行方法和信号量)

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback,
        SemaphoreReleaseOnlyOnce once) {
        this.opaque = opaque;
        this.timeoutMillis = timeoutMillis;
        this.invokeCallback = invokeCallback;
        this.once = once;
    }

对于同步通信来说,第三、四个参数为null;而对于异步通信来说,invokeCallback是在收到消息响应的时候能够根据responseTable找到请求码对应的回调执行方法,semaphore参数用作流控,当多个线程同时往一个连接写数据时可以通过信号量控制permit同时写许可的数量。 (3)异常发送流程处理—定时扫描responseTable本地缓存 在发送消息时候,如果遇到异常情况(比如服务端没有response返回给客户端或者response因网络而丢失),上面所述的responseTable的本地缓存Map将会出现堆积情况。这个时候需要一个定时任务来专门做responseTable的清理回收。在RocketMQ的客户端/服务端启动时候会产生一个频率为1s调用一次来的定时任务检查所有的responseTable缓存中的responseFuture变量,判断是否已经得到返回, 并进行相应的处理。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void scanResponseTable() {
        final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
        Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<Integer, ResponseFuture> next = it.next();
            ResponseFuture rep = next.getValue();

            if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
                rep.release();
                it.remove();
                rfList.add(rep);
                log.warn("remove timeout request, " + rep);
            }
        }

        for (ResponseFuture rf : rfList) {
            try {
                executeInvokeCallback(rf);
            } catch (Throwable e) {
                log.warn("scanResponseTable, operationComplete Exception", e);
            }
        }
    }

3.2、Server端接收消息并进行处理的具体实现

Server端接收消息的处理入口在NettyServerHandler类的channelRead0方法中,其中调用了processMessageReceived方法(这里省略了Netty服务端消息流转的大部分流程和逻辑)。其中服务端最为重要的处理请求方法实现如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    //根据RemotingCommand中的code获取processor和ExecutorService
    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
    final int opaque = cmd.getOpaque();

    if (pair != null) {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    //rpc hook
                    RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
                    if (rpcHook != null) {
                        rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                    }
                    //processor处理请求
                    final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                    //rpc hook
                    if (rpcHook != null) {
                        rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                    }

                    if (!cmd.isOnewayRPC()) {
                        if (response != null) {
                            response.setOpaque(opaque);
                            response.markResponseType();
                            try {
                                ctx.writeAndFlush(response);
                            } catch (Throwable e) {
                                PLOG.error("process request over, but response failed", e);
                                PLOG.error(cmd.toString());
                                PLOG.error(response.toString());
                            }
                        } else {

                        }
                    }
                } catch (Throwable e) {
                    if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException"
                        .equals(e.getClass().getCanonicalName())) {
                        PLOG.error("process request exception", e);
                        PLOG.error(cmd.toString());
                    }

                    if (!cmd.isOnewayRPC()) {
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
                            RemotingHelper.exceptionSimpleDesc(e));
                        response.setOpaque(opaque);
                        ctx.writeAndFlush(response);
                    }
                }
            }
        };

        if (pair.getObject1().rejectRequest()) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                "[REJECTREQUEST]system busy, start flow control for a while");
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            return;
        }

        try {
            //封装requestTask
            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
            //想线程池提交requestTask
            pair.getObject2().submit(requestTask);
        } catch (RejectedExecutionException e) {
            if ((System.currentTimeMillis() % 10000) == 0) {
                PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
                    + ", too many requests and system thread pool busy, RejectedExecutionException " //
                    + pair.getObject2().toString() //
                    + " request code: " + cmd.getCode());
            }

            if (!cmd.isOnewayRPC()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[OVERLOAD]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
            }
        }
    } else {
        String error = " request type " + cmd.getCode() + " not supported";
        //构建response
        final RemotingCommand response =
            RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
        PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
    }
}

上面的请求处理方法中根据RemotingCommand的请求业务码来匹配到相应的业务处理器;然后生成一个新的线程提交至对应的业务线程池进行异步处理。 (1)processorTable—请求业务码与业务处理、业务线程池的映射变量

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
        new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);

我想RocketMQ这种做法是为了给不同类型的请求业务码指定不同的处理器Processor处理,同时消息实际的处理并不是在当前线程,而是被封装成task放到业务处理器Processor对应的线程池中完成异步执行。(在RocketMQ中能看到很多地方都是这样的处理,这样的设计能够最大程度的保证异步,保证每个线程都专注处理自己负责的东西

三、总结

刚开始看RocketMQ源码—RPC通信模块可能觉得略微有点复杂,但是只要能够抓住Client端发送请求消息、Server端接收消息并处理的流程以及回调过程来分析和梳理,那么整体来说并不复杂。RPC通信部分也是RocketMQ源码中最重要的部分之一,想要对其中的全过程和细节有更为深刻的理解,还需要多在本地环境Debug和分析对应的日志。同时,鉴于篇幅所限,本篇还没有来得及对RocketMQ的Netty多线程模型进行介绍,将在消息中间件—RocketMQ的RPC通信(二)篇中来做详细地介绍。 在此顺便为自己打个Call,有兴趣的朋友可以关注下我的个人公众号:“匠心独运的博客”,对于Java并发、Spring、数据库和消息队列的一些细节、问题的文章将会在这个公众号上发布,欢迎交流与讨论。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.06.30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
当一个程序员决定穿上粉色裤子
这个想法在我脑海中不停地闪现,始终没有遇到特别合适的契机进行实践。直到最近,我遇到了一个名为 Fashion AI 的项目,它主要利用微调模型对服装图片进行分割(segmentation),然后裁剪出图像中标注(label)的时尚单品,并将所有图片调整为相同的大小,最后将这些图像转化为 embedding 向量存储在开源向量数据库 Milvus 中。通过这个项目可以在 Milvus 数据库中查询并获得 3 个最相似的向量结果。随后,就可以通过上传一张自己穿着打扮的照片,最终确定与我们时尚风格最为相似的明星。
Zilliz RDS
2023/08/25
3910
当一个程序员决定穿上粉色裤子
手把手教你训练自己的Mask R-CNN图像实例分割模型(PyTorch官方教程)
关于Mask R-CNN的详细理论说明,可以参见原作论文https://arxiv.org/abs/1703.06870,网上也有大量解读的文章。本篇博客主要是参考了PyTorch官方给出的训练教程,将如何在自己的数据集上训练Mask R-CNN模型的过程记录下来,希望能为感兴趣的读者提供一些帮助。
全栈程序员站长
2022/09/23
3.8K1
手把手教你训练自己的Mask R-CNN图像实例分割模型(PyTorch官方教程)
30分钟吃掉YOLOv8实例分割范例
本范例我们使用 torchkeras来实现对 ultralytics中的YOLOv8实例分割模型进行自定义的训练,从而对气球进行检测和分割。
lyhue1991
2023/09/17
2.5K1
30分钟吃掉YOLOv8实例分割范例
LLM入门5 | SAM代码从入门到出门 | MetaAI
非常好加载,基本上pytorch和torchvision版本不太落后就可以加载。里面的model_type需要和模型参数对应上,"vit_h"或者"vit_l"或者"vit_b",即便加载最大的2.4G的vit_h模型,也只需要占用8G的显卡。算是非常小的模型了。这里SAM测试的效果,很多情况下效果并不太好,是一个foundation model,我觉得主要原因是模型参数比较少。导致他不能很好的解决所有的问题。正确用法是对小领域最微调。
机器学习炼丹术
2023/09/02
1.2K0
LLM入门5 | SAM代码从入门到出门 | MetaAI
Marker 源码解析(二)
ApacheCN_飞龙
2024/03/09
2070
手把手入门教程:YOLOv8如何训练自己的数据集,交通信号灯识别
Ultralytics YOLOv8是Ultralytics公司开发的YOLO目标检测和图像分割模型的最新版本。YOLOv8是一种尖端的、最先进的(SOTA)模型,它建立在先前YOLO成功基础上,并引入了新功能和改进,以进一步提升性能和灵活性。它可以在大型数据集上进行训练,并且能够在各种硬件平台上运行,从CPU到GPU。
AI小怪兽
2023/11/03
6.8K0
Yolov8 源码解析(四十二)
ApacheCN_飞龙
2024/09/13
3530
PyTorch专栏(八):微调基于torchvision 0.3的目标检测模型
【磐创AI 导读】:本篇文章讲解了PyTorch专栏的第四章中的微调基于torchvision 0.3的目标检测模型。查看专栏历史文章,请点击下方蓝色字体进入相应链接阅读。查看关于本专栏的介绍:PyTorch专栏开篇。
磐创AI
2019/09/17
3K0
PyTorch专栏(八):微调基于torchvision 0.3的目标检测模型
Transformers 4.37 中文文档(九十一)
MGP-STR 模型由 Peng Wang、Cheng Da 和 Cong Yao 在多粒度预测用于场景文本识别中提出。MGP-STR 是一个概念上简单但强大的视觉场景文本识别(STR)模型,它建立在视觉 Transformer(ViT)之上。为了整合语言知识,提出了多粒度预测(MGP)策略,以隐式方式将语言模态的信息注入模型中。
ApacheCN_飞龙
2024/06/26
3540
Transformers 4.37 中文文档(九十一)
轻松学Pytorch – 行人检测Mask-RCNN模型训练与使用
大家好,这个是轻松学Pytorch的第20篇的文章分享,主要是给大家分享一下,如何使用数据集基于Mask-RCNN训练一个行人检测与实例分割网络。这个例子是来自Pytorch官方的教程,我这里是根据我自己的实践重新整理跟解读了一下,分享给大家。
OpenCV学堂
2020/08/20
3.5K0
轻松学Pytorch – 行人检测Mask-RCNN模型训练与使用
Caffe2 - (二十二) Detectron 之数据集加载与处理函数
如果处理新的数据集时,强烈推荐将数据集转化为 COCO json 格式,重用先有数据代码即可.
AIHGF
2019/02/27
1.3K0
Yolov8 源码解析(四十三)
ApacheCN_飞龙
2024/09/13
2190
TorchVision Faster R-CNN 微调,实战 Kaggle 小麦检测
本文将利用 TorchVision Faster R-CNN 预训练模型,于 Kaggle: 全球小麦检测[1] ? 上实践迁移学习中的一种常用技术:微调(fine tuning)。 本文相关的 Ka
GoCoding
2021/05/06
1.3K0
TorchVision Faster R-CNN 微调,实战 Kaggle 小麦检测
目标检测的常用数据处理方法!
在上节内容中,我们介绍了目标检测的基础概念,并分析了实现目标检测的常用思路,本篇文章将重点介绍在该领域的经典数据集:VOC数据集,以及使用Dataloader对其进行数据读取和预处理的全过程。
Datawhale
2021/01/07
8620
目标检测的常用数据处理方法!
Transformers 4.37 中文文档(九十三)
Pix2Struct 模型是由 Kenton Lee, Mandar Joshi, Iulia Turc, Hexiang Hu, Fangyu Liu, Julian Eisenschlos, Urvashi Khandelwal, Peter Shaw, Ming-Wei Chang, Kristina Toutanova 在《Pix2Struct: Screenshot Parsing as Pretraining for Visual Language Understanding》中提出的。
ApacheCN_飞龙
2024/06/26
2840
Transformers 4.37 中文文档(九十三)
使用YOLO11分割和高斯模糊创建人像效果
本文通过结合最新的YOLO11实例分割模型和高斯模糊,为你的图片应用人像效果。我们将使用YOLO11将人物从背景中分割出来,并对除了主体之外的所有内容应用模糊效果。
小白学视觉
2024/10/11
2430
使用YOLO11分割和高斯模糊创建人像效果
Transformers 4.37 中文文档(六十六)
Deformable DETR 模型是由 Xizhou Zhu,Weijie Su,Lewei Lu,Bin Li,Xiaogang Wang,Jifeng Dai 在Deformable DETR: Deformable Transformers for End-to-End Object Detection中提出的。Deformable DETR 通过利用一个新的可变形注意力模块,该模块只关注参考周围一小组关键采样点,从而缓解了原始 DETR 的收敛速度慢和特征空间分辨率有限的问题。
ApacheCN_飞龙
2024/06/26
5030
Transformers 4.37 中文文档(六十六)
Transformers 4.37 中文文档(六十八)
FocalNet 模型是由 Jianwei Yang、Chunyuan Li、Xiyang Dai、Lu Yuan、Jianfeng Gao 在焦点调制网络中提出的。FocalNets 完全用焦点调制机制取代了自注意力(在模型中使用,如 ViT 和 Swin),用于建模视觉中的令牌交互。作者声称,FocalNets 在图像分类、目标检测和分割任务上优于基于自注意力的模型,且具有类似的计算成本。
ApacheCN_飞龙
2024/06/26
1970
Transformers 4.37 中文文档(六十八)
PyTorch 2.2 中文官方教程(四)
作者: Jeremy Howard,fast.ai。感谢 Rachel Thomas 和 Francisco Ingham。
ApacheCN_飞龙
2024/02/05
5660
PyTorch 2.2 中文官方教程(四)
人脸检测——mtcnn思想,生成negative、positive、part样本。
摘要总结:本文研究了面部识别技术,详细阐述了其基本原理、发展历程、应用领域以及未来前景。
MachineLP
2018/01/09
1.3K0
推荐阅读
相关推荐
当一个程序员决定穿上粉色裤子
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文