RabbitMQ的启动步骤是一个有向无环图,具体细节后面另外章节再聊,其中网络一块的启动集中在文件rabbit_network.erl中
boot() ->
ok = record_distribution_listener(),
ok = start(),
ok = boot_tcp(),
ok = boot_ssl().
第1行往mnesia中插入监听信息,第4行是SSL的,我们也先不看,重点看下第2、3行代码。
1、start函数
start() -> rabbit_sup:start_supervisor_child(
rabbit_tcp_client_sup, rabbit_client_sup,
[{local, rabbit_tcp_client_sup},
{rabbit_connection_sup,start_link,[]}]).
启动名为 rabbit_tcp_client_sup 的监督者进程,这个监督者进程的入口为
rabbit_client_sup:start_link,后面一长串为启动参数。
rabbit_tcp_client_sup 进程启动就做1件事:启动rabbit_connection_sup监督者进程。
而rabbit_connection_sup监督者进程会启动真正干活的 rabbit_reader进程,这个后面再细讲。
2、boot_tcp函数
boot_tcp() ->
{ok, TcpListeners} = application:get_env(tcp_listeners),
[ok = start_tcp_listener(Listener) || Listener <- TcpListeners],
ok.
根据tcp_listeners配置调用 start_tcp_listener 函数;
start_tcp_listener会调用start_listener,后者会调用start_listener0
start_listener0(Address, Protocol, Label, OnConnect) ->
Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(),
Protocol, Label, OnConnect),
case supervisor:start_child(rabbit_sup, Spec) of
{ok, _} -> ok;
{error, {shutdown, _}} -> {IPAddress, Port, _Family} = Address,
exit({could_not_start_tcp_listener,
{rabbit_misc:ntoa(IPAddress), Port}})
end.
最终也是启动子进程,测试环境参数打印如下:
{'rabbit_tcp_listener_sup_:::5672',
{tcp_listener_sup,start_link,
[{0,0,0,0,0,0,0,0}, //IPAddress
5672, //Port
[inet6,binary,binary,
{packet,raw},
{reuseaddr,true},
{backlog,128},
{nodelay,true},
{linger,{true,0}},
{exit_on_close,false},
{active,false}], //SocketOpts
{rabbit_networking,tcp_listener_started,[amqp]}, //OnStartup
{rabbit_networking,tcp_listener_stopped,[amqp]}, //OnShutdown
{rabbit_networking,start_client,[]}, //AcceptCallback
"TCP Listener"]},
transient,infinity,supervisor,
[tcp_listener_sup]}
关注AcceptCallback函数,这个在每个连接进来的时候会调用,下面会再讲。
再以这些参数启动 tcp_listener_sup 监督者进程,后者再会启动 tcp_acceptor_sup和 tcp_listener子进程,tcp_acceptor_sup还是一个监督者进程,它会启动最终干活的 tcp_acceptor 进程。
是不是感觉有点晕,我们来整理下各进程关系:
tcp_listener 进程就是用来监听套接字的,它会调用 gen_tcp:listen 来监听套接字,下面会细讲。
上面我们整理了初始化顺序,那么一个连接过到底会经过哪些进程呢?
1)首先是tcp_listener
init({IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
{M,F,A} = OnStartup, OnShutdown, Label}) ->
process_flag(trap_exit, true),
case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress},
{active, false}]) of
{ok, LSock} ->
lists:foreach(fun (_) ->
{ok, _APid} = supervisor:start_child(
AcceptorSup, [LSock])
end,
lists:duplicate(ConcurrentAcceptorCount, dummy)),
{ok, {LIPAddress, LPort}} = inet:sockname(LSock),
apply(M, F, A ++ [IPAddress, Port]),
{ok, #state{sock = LSock,
on_startup = OnStartup, on_shutdown = OnShutdown,
label = Label}}
end.
这里省略了一些异常分支代码。
tcp_listener进程在初始化时会调用 AcceptorSup 也就是 tcp_acceptor_sup 来启动工作线程,真正干活的是tcp_acceptor进程,这个进程会通过 prim_inet:async_accept 来异步 accespt连接,每当新连接进来时会调用 rabbit_networking:start_client 进行初始化(前面有说明),相关代码如下:
init({Callback, LSock}) ->
gen_server:cast(self(), accept),
{ok, #state{callback=Callback, sock=LSock}}.
accept(State = #state{sock=LSock}) ->
case prim_inet:async_accept(LSock, -1) of
{ok, Ref} -> {noreply, State#state{ref=Ref}};
Error -> {stop, {cannot_accept, Error}, State}
end.
rabbit_networking:start_client 会启动rabbit_reader,然后向其发送go 消息
start_client(Sock, SockTransform) ->
{ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []),
ok = rabbit_net:controlling_process(Sock, Reader),
Reader ! {go, Sock, SockTransform},
再看看rabbit_reader是如何处理 go 消息的:
init(Parent, HelperSup) ->
Deb = sys:debug_options([]),
receive
{go, Sock, SockTransform} ->
start_connection(Parent, HelperSup, Deb, Sock, SockTransform)
end.
start_connection然后会调用 recv_loop来启动消息循环,一直读包,然后解包,这块代码比较细,后面再细讲。
总结下,一个连接进程会经过的进程链如下,中间省略了监督者进程:
tcp_listener ——> tcp_acceptor ——> rabbit_reader