前面讲了基本的网络框架,说了一个连接进来会经过哪些文件,RabbitMQ网络框架代码分析,今天会讲消息的具体的处理过程,即一条消息过来,最终是如何分发到具体的处理函数的。
从rabbit_reader:start_connection说起吧,一个连接进来会调用这个函数,它会调用recvloop进行不断地读数据,并进行相应的处理:
start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
State = #v1{parent = Parent,
sock = ClientSock,
connection = #connection{
name = list_to_binary(Name),
host = Host,
peer_host = PeerHost,
port = Port,
peer_port = PeerPort,
protocol = none,
user = none,
timeout_sec = (HandshakeTimeout / 1000),
frame_max = ?FRAME_MIN_SIZE,
vhost = none,
client_properties = none,
capabilities = [],
auth_mechanism = none,
auth_state = none,
connected_at = rabbit_misc:now_to_ms(os:timestamp())},
callback = uninitialized_callback,
recv_len = 0,
pending_recv = false,
connection_state = pre_init,
queue_collector = undefined, %% started on tune-ok
helper_sup = HelperSup,
heartbeater = none,
channel_sup_sup_pid = none,
channel_count = 0,
throttle = #throttle{
alarmed_by = [],
last_blocked_by = none,
last_blocked_at = never}},
run({?MODULE, recvloop,
[Deb, [], 0, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
handshake, 8)]})
注:这里删掉了一些非核心代码。
重点关注switch_callback 函数,其中第2个参数,表示当前由哪个函数处理,初始化时由 handshake 来处理;第3个参数将设置接下来要接收数据包的长度,初始化时8。
这里简单说下一次消息发送会发生哪些交互:
这是在测试环境截取的包,其中第一个包就是发送handshake包,具体格式如下:
这里不深入讨论具体命令的含义,有兴趣的同学可以查下相关资料。
再来看recvloop 函数:
Deb, Buf, BufLen, State = #v1{pending_recv = true}) ->
mainloop(Deb, Buf, BufLen, State);
recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) ->
mainloop(Deb, Buf, BufLen, State);
recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) ->
throw({become, F(Deb, Buf, BufLen, State)});
recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen})
when BufLen < RecvLen ->
case rabbit_net:setopts(Sock, [{active, once}]) of
ok -> mainloop(Deb, Buf, BufLen,
State#v1{pending_recv = true});
{error, Reason} -> stop(Reason, State)
end;
recvloop(Deb, [B], _BufLen, State) ->
{Rest, State1} = handle_input(State#v1.callback, B, State),
recvloop(Deb, [Rest], size(Rest), State1);
recvloop(Deb, Buf, BufLen, State = #v1{recv_len = RecvLen}) ->
{DataLRev, RestLRev} = binlist_split(BufLen - RecvLen, Buf, []),
Data = list_to_binary(lists:reverse(DataLRev)),
{<<>>, State1} = handle_input(State#v1.callback, Data, State),
recvloop(Deb, lists:reverse(RestLRev), BufLen - RecvLen, State1).
这里有6个分支,我们从上到下为 每一个分支编号,实际上调用的最多的是4,5,6这几个分支。
刚开始设置要读取8个字节,并且callback为 handshake ,所以走到分支4,
分支4会调用mainloop读取数据:
Recv = rabbit_net:recv(Sock),
case Recv of
{data, Data} ->
recvloop(Deb, [Data | Buf], BufLen + size(Data),
State#v1{pending_recv = false});
读到数据之后,来到分支5,
recvloop(Deb, [B], _BufLen, State) ->
{Rest, State1} = handle_input(State#v1.callback, B, State),
recvloop(Deb, [Rest], size(Rest), State1);
调用 handle_input 来处理数据,这里的callback是 handshake 。
handle_input也有不同的分支,我们看下 handshake 分支:
handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) ->
{Rest, handshake({A, B, C, D}, State)};
handshake({0, 0, 9, 1}, State) ->
start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State);
start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
Protocol,
State = #v1{sock = Sock, connection = Connection}) ->
Start = #'connection.start'{
version_major = ProtocolMajor,
version_minor = ProtocolMinor,
server_properties = server_properties(Protocol),
mechanisms = auth_mechanisms_binary(Sock),
locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol),
switch_callback(State#v1{connection = Connection#connection{
timeout_sec = ?NORMAL_TIMEOUT,
protocol = Protocol},
connection_state = starting},
frame_header, 7).
handshake会调用start_connection, 后者会向客户端回一个connection.start的包,并且将callback设置为frame_header。
frame_header就是读取消息头,大部分消息头都是一个固定的格式:
Type:1字节
Channel:1字节
Length:4字节
Body:长度为上面的Length
看下frame_header的处理:
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>,
State) ->
{Rest, ensure_stats_timer(
switch_callback(State,
{frame_payload, Type, Channel, PayloadSize},
PayloadSize + 1))};
它会将 callback设置为 frame_payload,
handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
<<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data,
case EndMarker of
?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
{Rest, switch_callback(State1, frame_header, 7)};
_ -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
Type, Channel, Payload, State)
end;
它会调用handle_frame 来分发命令,
handle_frame(Type, 0, Payload,
State = #v1{connection = #connection{protocol = Protocol}})
when ?IS_STOPPING(State) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
_Other -> State
end;
handle_frame 会调用rabbit_command_assembler:analyze_frame 来拆包,拆出来的就是具体的命令了,这里的Protocol就是0_9_0_1,对应src/rabbit_framing_amqp_0_9_1.erl。
lookup_method_name({10, 10}) -> 'connection.start';
lookup_method_name({10, 11}) -> 'connection.start_ok';
lookup_method_name({10, 20}) -> 'connection.secure';
lookup_method_name({10, 21}) -> 'connection.secure_ok';
lookup_method_name({10, 30}) -> 'connection.tune';
lookup_method_name({10, 31}) -> 'connection.tune_ok';
这里根据不同的字节流得到相应的命令,PS:erlang做这些确实很简单~
得到命令之后就是执行命令了,由 handle_method0 来执行,这里就不详细讲了,可以自己看下代码
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
response = Response,
client_properties = ClientProperties},
State0 = #v1{connection_state = starting,
connection = Connection,
sock = Sock}) ->
AuthMechanism = auth_mechanism_to_module(Mechanism, Sock),
Capabilities =
case rabbit_misc:table_lookup(ClientProperties, <<"capabilities">>) of
{table, Capabilities1} -> Capabilities1;
_ -> []
end,
State = State0#v1{connection_state = securing,
connection =
Connection#connection{
client_properties = ClientProperties,
capabilities = Capabilities,
auth_mechanism = {Mechanism, AuthMechanism},
auth_state = AuthMechanism:init(Sock)}},
auth_phase(Response, State);
一部分消息处理命令在 rabbit_reader.erl中,还是些是关于channel的,相应命令的 rabbit_channel.erl文件中。
最后我们总结下,消息分发流程:
1、start_connection
2、recvloop,它会调用mainloop读取数据
3、handle_input 处理数据
正常来说消息分这几步:
约定协议:handshake
读取消息头:header
读取消息body:payload
payload的格式是:
Type:1字节
Channel:1字节
Length:4字节
Body:Length字节
4、消息处理
主要处理函数是handle_frame,拆包之后调用不同的handle_method0函数。