请选择 进入手机版 | 继续访问电脑版
查看: 6818|回复: 8

Rabbitmq的网络层要点浅析

[复制链接]
发表于 2009-12-15 14:13 | 显示全部楼层 |阅读模式
最近在锋爷的建议下开始读rabbitmq的源码,锋爷说这个项目已经很成熟,并且代码也很有借鉴和学习的意义,在自己写erlang代码之前看看别人是怎么写的,可以少走弯路,避免养成一些不好的习惯,学习一些最佳实践。读了一个星期,这个项目果然非常棒,代码也写的非常清晰易懂,一些细节的处理上非常巧妙,比如我这里想分享的网络层一节。
' N" Z9 f7 v5 k- }" g  \    Rabbitmq是一个MQ系统,也就是消息中间件,它实现了AMQP 0.8规范,简单来说就是一个TCP的广播服务器。AMQP协议,你可以类比JMS,不过JMS仅仅是java领域内的API规范,而AMQP比JMS更进一步,它有自己的wire-level protocol,有一套可编程的协议,中立于语言。简单介绍了Rabbitmq之后,进入正题。
) ^# v+ B5 g# J! E2 ~  u2 v    Rabbitmq充分利用了Erlang的分布式、高可靠性、并发等特性,首先看它的一个结构图:. }2 ]+ n) _) L9 l
1.png 6 K( y, R- D6 x( g* }2 l

5 v1 v0 n3 m7 g# V这张图展现了Rabbitmq的主要组件和组件之间的关系,具体到监控树的结构,我画了一张图:
" b  G2 Z6 d# m# L' l9 C. d1 K/ v; X8 D7 }- i5 l
2.png
% v7 o, u  X& x5 ^% O! \3 p  a+ h; C. i, w
; m2 d. u2 ~& F3 v3 K* b

& v" O$ {& @, S7 O3 r2 [2 ?" F; L% p8 D' X8 E4 K. V. b
/ g( X% e! w  p; h: o; u. j
    顶层是rabbit_sup supervisor,它至少有两个子进程,一个是rabbit_tcp_client_sup,用来监控每个connection的处理进程 rabbit_reader的supervisor;rabbit_tcp_listener_sup是监控tcp_listener和 tcp_acceptor_sup的supervisor,tcp_listener里启动tcp服务器,监听端口,并且通过 tcp_acceptor_sup启动N个tcp_accetpor,tcp_acceptor发起accept请求,等待客户端连接;tcp_acceptor_sup负责监控这些acceptor。这张图已经能给你一个大体的印象。
- l$ T' `/ O3 H   
" k$ Z+ e% c% I1 ~, G4 @" [7 b    讲完大概,进入细节,说说几个我觉的值的注意的地方:4 Q3 q; F; N3 T" L( T: C2 p( N# O
1、tcp_accepto.erl,r对于accept采用的是异步方式,利用prim_inet:async_accept/2方法,此模块没有被文档化,是otp库内部使用,通常来说没必要使用这一模块,gen_tcp:accept/1已经足够,不过rabbitmq是广播程序,因此采用了异步方式。使用async_accept,需要打patch,以使得socket好像我们从gen_tcp:accept/1得到的一样:& R8 [% X$ F: k; q. ^% u
' w; L/ {% _9 o, x8 P- J
handle_info({inet_async, LSock, Ref, {ok, Sock}},2 e& N: \$ F& U1 K3 o8 H  i7 C7 y
            State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) ->
/ t3 m& H" m* U+ y3 d! y    %%这里做了patch
7 Q# v1 G- [9 w- }    %% patch up the socket so it looks like one we got from
4 U& n1 q% ~: i1 P4 p2 m1 J( ]; ~# l    %% gen_tcp:accept/1 : `. w" J4 f) h  I) r3 k
    {ok, Mod} = inet_db:lookup_socket(LSock),
% [8 S% P" Q; }0 a    inet_db:register_socket(Sock, Mod),7 M" M: P0 S( t/ S; }; R2 i1 R
5 Q( X3 h$ T) o. E* M( c
    try  k0 n* |# i1 I; K" z/ ?. U
        %% report' S3 ?# Y6 I# ^, e8 ~9 u
        {Address, Port}         = inet_op(fun () -> inet:sockname(LSock) end),( s3 }, u: c: {5 f- u+ |. F: K
        {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end),
! X5 P% P& p; Y  i! E9 A        error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
4 E* l7 ^+ E1 g5 d                              [inet_parse:ntoa(Address), Port,! L4 P: y' ^8 V0 d8 H: s/ F
                               inet_parse:ntoa(PeerAddress), PeerPort]),4 a6 N3 X' Y* L- n
        %% 调用回调模块,将Sock作为附加参数! \& e/ l" R; j( Q8 M+ J5 [! d4 w
        apply(M, F, A ++ [Sock])
# M" e( q: f' R0 I. F- R3 l    catch {inet_error, Reason} ->
/ e6 P2 s6 L* i7 D2 G            gen_tcp:close(Sock),
4 ^$ X0 |+ A" [& ?: Q; e            error_logger:error_msg("unable to accept TCP connection: ~p~n",
+ \0 I9 Y* n9 Z  {0 N8 v                                   [Reason])
# U# b' W* Z; X8 X& [6 f    end,
6 Q! b( D  N7 S/ W6 _; P3 G" W6 k: F# F1 f  y
    %% 继续发起异步调用' g$ |1 I1 v" L, d# T
    case prim_inet:async_accept(LSock, -1) of% f. v* y! U5 R: H
        {ok, NRef} -> {noreply, State#state{ref=NRef}};
2 T) G8 P7 v0 U! d        Error -> {stop, {cannot_accept, Error}, none}
) U/ X. X+ d/ z    end;
/ n/ [- Q/ G& a% _2 l%%处理错误情况
0 z3 u9 L- k9 P. ~0 mhandle_info({inet_async, LSock, Ref, {error, closed}},
$ {5 {' f; U& `# H6 L. [* I            State=#state{sock=LSock, ref=Ref}) ->0 k# s! g) `0 f# }" \2 X; I
    %% It would be wrong to attempt to restart the acceptor when we8 V1 X! V7 O+ x9 c: a
    %% know this will fail.
1 C& m5 Y8 S2 @' n( g6 t    {stop, normal, State};) {* B% n/ ^6 R4 f- B+ j9 {$ z$ A

7 }8 ?+ B0 m5 e+ r; `6 _2、rabbitmq内部是使用了多个并发acceptor,这在高并发下、大量连接情况下有效率优势,类似java现在的nio框架采用多个reactor类似,查看tcp_listener.erl:
0 ?3 \( Y7 |; |9 _5 m$ q
8 ?  ^( u# Q1 Einit({IPAddress, Port, SocketOpts,0 U" I( N3 [- `; u  b/ }
      ConcurrentAcceptorCount, AcceptorSup,
% P: B" Z9 @/ E- o. W8 ?+ A      {M,F,A} = OnStartup, OnShutdown, Label}) ->* [- o+ i3 C. x! F  y! ^# a
    process_flag(trap_exit, true),9 `- O" I: E  r2 o2 j$ ~/ v8 I
    case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress},  j. k1 Y! M* s. s8 w
                                             {active, false}]) of$ [) @6 E; ^0 }# n) i
        {ok, LSock} ->
. h$ X6 Y+ K( E; }; T* u. B0 I! P             %%创建ConcurrentAcceptorCount个并发acceptor# ~" l" C) T4 b; l
            lists:foreach(fun (_) ->- p* ?3 a( r6 ?5 V7 E
                                  {ok, _APid} = supervisor:start_child(! R# \  H3 @( x1 h
                                                  AcceptorSup, [LSock])
0 [6 V3 V' h; T9 s+ I8 e6 P                          end,
$ p+ x4 U& A% v1 G3 ^8 `                          lists:duplicate(ConcurrentAcceptorCount, dummy)),

( {6 D$ Q6 g- K            {ok, {LIPAddress, LPort}} = inet:sockname(LSock),/ Y4 F5 G6 M6 ?8 |+ T
            error_logger:info_msg("started ~s on ~s:~p~n",
( d5 O/ i0 j1 b  I$ \                                  [Label, inet_parse:ntoa(LIPAddress), LPort]),
+ ]! ]2 q( C* z1 ]- Q" L3 k6 P. x            %%调用初始化回调函数
9 X8 m" F# J) U* Y* d; L8 O            apply(M, F, A ++ [IPAddress, Port]),0 O3 N! ]0 Y/ S+ L  _
            {ok, #state{sock = LSock,
7 e: ?- @$ Z$ U9 I/ @, L0 L                        on_startup = OnStartup, on_shutdown = OnShutdown,
  t" P: X+ _8 S                        label = Label}};; r1 ]# T; z# ?1 r% K$ A  E( }
        {error, Reason} ->
# n0 v& d% {! t            error_logger:error_msg() S& e) J3 i, h( p& Z5 v
              "failed to start ~s on ~s:~p - ~p~n",
* L3 [% m8 c. ^. M              [Label, inet_parse:ntoa(IPAddress), Port, Reason]),9 p5 W* O2 A& i% E0 Y. n  f5 Z4 O8 T
            {stop, {cannot_listen, IPAddress, Port, Reason}}8 Z, x" q5 b- J% G- h
    end.
2 j+ `2 ]/ L6 d: w% m5 |7 x& W+ M1 @  R+ m) I) p) Q
这里有一个技巧,如果要循环N次执行某个函数F,可以通过lists:foreach结合lists:duplicate(N,dummy)来处理。' g) @3 o7 G! d- a
8 p6 J; m0 ]3 r
lists:foreach(fun(_)-> F() end,lists:duplicate(N,dummy)).: p  c, N, h" V! ~6 `
- P* }* q8 j! X4 V9 N
3、simple_one_for_one策略的使用,可以看到对于tcp_client_sup和tcp_acceptor_sup都采用了simple_one_for_one策略,而非普通的one_fo_one,这是为什么呢?
: j* n4 \8 ^! @. H  g这牵扯到simple_one_for_one的几个特点:. J0 A6 L+ N8 T9 i0 A. j& t
1)simple_one_for_one内部保存child是使用dict,而其他策略是使用list,因此simple_one_for_one更适合child频繁创建销毁、需要大量child进程的情况,具体来说例如网络连接的频繁接入断开。% s6 {3 T( ^( A+ `  C7 o
2)使用了simple_one_for_one后,无法调用terminate_child/2 delete_child/2 restart_child/2 & t6 v6 [: A) r: x) r) M7 A, C

, M0 I0 x( K/ ^
3)start_child/2 对于simple_one_for_one来说,不必传入完整的child spect,传入参数list,会自动进行参数合并在一个地方定义好child spec之后,其他地方只要start_child传入参数即可启动child进程,简化child都是同一类型进程情况下的编程
9 ?, s' y9 a) g3 A* U' v( J
5 D0 @" V4 u, t在 rabbitmq中,tcp_acceptor_sup的子进程都是tcp_acceptor进程,在tcp_listener中是启动了 ConcurrentAcceptorCount个tcp_acceptor子进程,通过supervisor:start_child/2方法:
3 d( P: h: U# `9 S0 T& x3 C( P: p' o( n
%%创建ConcurrentAcceptorCount个并发acceptor
% p2 E) ?. s9 g            lists:foreach(fun (_) ->! A/ a. v  d1 C
                                  {ok, _APid} = supervisor:start_child() U5 v0 u6 E' B* y; ~: i9 E$ }1 U
                                                  AcceptorSup, [
LSock])% }; y7 [, M6 E  g: d4 s/ Z% [+ U8 K& f
                          end,$ Q& Q2 n( P4 F7 L) z
                          lists:duplicate(ConcurrentAcceptorCount, dummy)),3 Y3 b/ `0 t2 W6 B- D( t1 ?

3 d: p" [0 h9 v+ v' X4 [注意到,这里调用的start_child只传入了LSock一个参数,另一个参数CallBack是在定义child spec的时候传入的,参见tcp_acceptor_sup.erl:
/ P/ V( \& v- |2 {init(Callback) ->
. _& F' F7 U5 K6 u    {ok, {{simple_one_for_one, 10, 10},, y3 ~% ^4 L1 h% t, X, t2 n0 z  F& ]
          [{tcp_acceptor, {tcp_acceptor, start_link, [Callback]},2 ~1 `* G/ A* O' @" N( G1 @
            transient, brutal_kill, worker, [tcp_acceptor]}]}}., ?2 g) @$ A0 U& @3 S6 Y* q% v2 D: e

/ r; Q' U  F5 Y3 l1 aErlang内部自动为simple_one_for_one做了参数合并,最后调用的是tcp_acceptor的init/2:
' P/ z) {7 D: }8 p. U# V& `# u( V' X1 E4 P
init({Callback, LSock}) ->
$ n5 }* V2 J& e- w" v7 a1 Q; h; C    case prim_inet:async_accept(LSock, -1) of6 D& I, }, S0 u  y, y. p
        {ok, Ref} -> {ok, #state{callback=Callback, sock=LSock, ref=Ref}};8 o/ L3 x, T! H( |
        Error -> {stop, {cannot_accept, Error}}: L8 p! U) S2 v9 C& t8 A
    end.
/ u/ p2 B' a9 u* d# |- Z' C6 |4 h3 d# h" b" ?  n
对于tcp_client_sup的情况类似,tcp_client_sup监控的子进程都是rabbit_reader类型,在 rabbit_networking.erl中启动tcp_listenner传入的处理connect事件的回调方法是是 rabbit_networking:start_client/1:8 n. n! S. W7 n# S' W" ]( B! t

& ^" e/ X/ V7 C$ \7 ~start_tcp_listener(Host, Port) ->* x% S6 Z  e) y; }- b
    start_listener(Host, Port, "TCP Listener",
( E3 Z, J, s' ^- c' M) ~$ w                   %回调的MFA
8 u. }" y' ~$ D$ W) G# w                   {?MODULE, start_client, []}).$ E% f( u. a2 M% ]# l5 s2 g/ A

; `1 i4 v3 r. K3 v/ bstart_client(Sock) -># _* u% T. e" t6 A# [
    {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),9 c( [7 K7 w' ~1 c8 @4 t
    ok = rabbit_net:controlling_process(Sock, Child),
4 P9 ]& |; t* m6 U  c( }+ j% a) G    Child ! {go, Sock},
! N  |% w( r4 ]    Child.
" h0 G% x# S; j( t, v  [/ s+ D0 l9 f; t% D
start_client调用了supervisor:start_child/2来动态启动rabbit_reader进程。
! h6 z9 Q, G+ v0 e: u( S' H  @- v2 e8 J, u0 n
4、协议的解析,消息的读取这部分也非常巧妙,这一部分主要在rabbit_reader.erl中,对于协议的解析没有采用gen_fsm,而是实现了一个巧妙的状态机机制,核心代码在mainloop/4中:) j' o  T3 ?7 m  R
%启动一个连接
* B. D* r- U: F3 i& `1 U4 Xstart_connection(Parent, Deb, ClientSock) ->. v  _% j* a, O6 ?  h
    process_flag(trap_exit, true),, d) V5 S) O5 w; o: I, r  x
    {PeerAddressS, PeerPort} = peername(ClientSock),) I3 i! \9 J, K7 F! `! _1 L5 K
    ProfilingValue = setup_profiling(),( B) L& g, ^4 m: W. c# r  Q! m1 p
    try
5 \0 ^' H) ?  F        rabbit_log:info("starting TCP connection ~p from ~s:~p~n",4 B/ U* V+ p! }0 B+ M
                        [self(), PeerAddressS, PeerPort]),  m0 W3 |  b/ y  A) u. o+ F
         %延时发送握手协议5 j/ l: I7 U- f4 K1 ?3 N8 l6 F2 [& E
        Erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),, {1 i" g% F0 y* a- c  V: e7 }2 ?( }+ }
                          handshake_timeout),# }: T9 w7 t# z
        %进入主循环,更换callback模块,魔法就在这个switch_callback9 A& @2 f% X# S- G8 r
        mainloop(Parent, Deb, switch_callback(
" Q: h* T7 d% v. \! L                                #v1{sock = ClientSock,% n* M/ W3 c7 o% i
                                    connection = #connection{! x$ b8 E3 \  k! |
                                      user = none,( h. A* ?% P* x) H; H
                                      timeout_sec = ?HANDSHAKE_TIMEOUT,
& f! K2 W9 R; R/ W* U                                      frame_max = ?FRAME_MIN_SIZE,
1 J9 O# O! m# q, x; A! s4 V" l5 M                                      vhost = none},' r  L/ @7 }) {' l
                                    callback = uninitialized_callback,
) U. @/ q( X4 r6 N' l                                    recv_ref = none,
$ G& M% B5 }  E  O- [6 O                                    connection_state = pre_init},* _* I5 O: k0 A
                                %%注意到这里,handshake就是我们的回调模块,8就是希望接收的数据长度,AMQP协议头的八个字节。  ~; v. T# I4 ^' Z, m6 w1 ~' ]
                                handshake, 8))
+ b+ @. x0 Z2 F4 p. N9 g9 R4 ^6 X6 |. q4 e7 Q
魔法就在switch_callback这个方法上:; X3 G( ?' p' [
switch_callback(OldState, NewCallback, Length) ->% r; ]* J& }# W1 J
    %发起一个异步recv请求,请求Length字节的数据5 w1 @3 T4 q& M6 |& n
    Ref = inet_op(fun () -> rabbit_net:async_recv(4 ]/ m$ K) y/ K. X( h+ F
                              OldState#v1.sock, Length, infinity) end),
6 ?* R1 _1 p6 d  d0 s7 r0 B8 d& _    %更新状态,替换ref和处理模块. Y; y, T* R, X6 p% |
    OldState#v1{callback = NewCallback,$ d$ j# Z( I. A9 X6 O
                recv_ref = Ref}.

- d* \# I9 ~4 v0 F2 H7 ?
+ `4 v, u5 e) _. K) `; @+ x6 y) r0 M异步接收Length个数据,如果有,erlang会通知你处理。处理模块是什么概念呢?其实就是一个状态的概念,表示当前协议解析进行到哪一步,起一个label的作用,看看mainloop/4中的应用:6 x0 P& E2 a' T- U
* Q, T, W) ?; g1 b8 K
mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->1 T' m4 c+ h8 K9 h6 l+ I
    %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
9 C/ t# ?. Q' a( w' C1 X8 K    receive1 F4 }) ?; k* \; T" S; h% D
        %%接收到数据,交给handle_input处理,注意handle_input的第一个参数就是callback: `- x5 r; q/ _9 }3 n
        {inet_async, Sock, Ref, {ok, Data}} ->
1 b0 \/ F5 P: {' v+ S- b+ t# _            %handle_input处理
" b( x, _! m+ r% S* i1 n! r            {State1, Callback1, Length1} =
$ v! V% Z' O4 Y' b# w" b4 U                handle_input(State#v1.callback, Data,
* z/ D( I! x0 W- J) ^# A( P) l1 U                             State#v1{recv_ref = none}),

% m# G* V3 S  W            %更新回调模块,再次发起异步请求,并进入主循环
" h8 K( G9 L6 {2 m7 `8 R4 p            mainloop(Parent, Deb,8 W% f' I3 o& L) k8 ?
                     switch_callback(State1, Callback1, Length1));% k2 d! X* G) p$ B
( P& \. J/ u0 Z. z: J7 w
/ H, K6 z! D8 U4 O4 Z) D9 x  N
handle_input有多个分支,每个分支都对应一个处理模块,例如我们刚才提到的握手协议:  C9 d  N) |6 D" a- _

( T8 E6 U+ Y. c6 `; G( l  e%handshake模块,注意到第一个参数,第二个参数就是我们得到的数据/ D! q" P6 U/ h: J
handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,$ Z7 }% _5 K) c+ f
             State = #v1{sock = Sock, connection = Connection}) ->' W3 ^. ?  V. C& {8 |
     %检测协议是否兼容
/ j& }! x- D$ ]/ @+ D" t  M    case check_version({ProtocolMajor, ProtocolMinor},
% s! z- [3 ]9 J8 z6 K1 M9 i                       {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of
+ j1 V* q! D2 e7 R8 C        true ->0 U9 r; o* t# x' d% J
            {ok, Product} = application:get_key(id)," E5 h7 w! k  ^: H/ L: F
            {ok, Version} = application:get_key(vsn),  d$ A: K, ~4 T$ a) T3 i" b8 v
            %兼容的话,进入connections start,协商参数, m' e$ t3 V8 S& p* O" @- o" C, c" d
            ok = send_on_channel0(
' @8 f. E. j2 C* J/ P                   Sock,
+ _- o' P, j: ~3 A- a+ e                   #'connection.start'{) z0 I% g) I# `( f4 o2 v; V& g# ~7 {
                     version_major = ?PROTOCOL_VERSION_MAJOR,
9 x3 u; t* M8 ]# B                     version_minor = ?PROTOCOL_VERSION_MINOR,
$ A  q5 a  e# e3 E1 j# I  J8 Y                     server_properties =% G- N; S. J# u4 h  q
                     [{list_to_binary(K), longstr, list_to_binary(V)} ||
* E, H4 e. c& r( c0 |- C                         {K, V} <-; h3 \9 y1 c7 N# m9 r
                             [{"product",     Product},' @% i2 \% L1 G* Z9 v
                              {"version",     Version},
0 l& s5 T5 @9 c4 e5 l) e' J                              {"platform",    "Erlang/OTP"},( q3 |* ~2 ^, p: \
                              {"copyright",   ?COPYRIGHT_MESSAGE}," A' X" p6 W4 F( N
                              {"information", ?INFORMATION_MESSAGE}]],
' R- V1 w; Z; r( D2 G                     mechanisms = <<"PLAIN AMQPLAIN">>,
, `2 [7 Y, J. O9 \0 v6 \) Y" V4 w                     locales = <<"en_US">> }),. ?: X3 E) T8 F: m# o  l
            {State#v1{connection = Connection#connection{" b  g9 X& V4 p" q
                                     timeout_sec = ?NORMAL_TIMEOUT},
0 L" ^% m: c! ?5 v3 O                      connection_state = starting},
$ R6 i" _, y3 c: ~4 L$ l; g             frame_header, 7};
, f% L% h3 j& c; ?( O, O, n/ }. X         %否则,断开连接,返回可以接受的协议) q3 @. b/ ]' d" q! z  F) Z8 ]5 [- Y
        false ->
7 i$ n/ ?4 Q6 f' P+ L            throw({bad_version, ProtocolMajor, ProtocolMinor})
6 A) r: q: h6 _* t4 u+ ^. w    end;
" H+ a. Y$ m. A8 W! y+ Z" Q6 h+ b0 U& s4 U" {& Z
    其他协议的处理也是类似,通过动态替换callback的方式来模拟状态机做协议的解析和数据的接收,真的很巧妙!让我们体会到Erlang的魅力,FP的魅力。$ b( I% d2 \8 c% ?

2 P  _2 a: b5 C' S7 e3 ~! {( C* L5、序列图:% `& a# S* E. A8 I; V
1)tcp server的启动过程:$ L; y  a7 \1 _
3.png
! G' E- F5 G: F( c. ^
2 G0 x, [2 N( `/ ^, G
# r" c5 j7 M' j2 d
2)一个client连接上来的处理过程:: e1 W/ q0 G9 ]1 M' c/ H. R; ~
4.png 0 X1 Y6 |" ?4 S* s
/ Q" @/ V& M  d7 Y! [

8 [8 l0 V2 {. V6 r. Y0 E
) W) l: F, R% e3 o+ Y, h    小结:从上面的分析可以看出,rabbitmq的网络层是非常健壮和高效的,通过层层监控,对每个可能出现的风险点都做了考虑,并且利用了 prime_net模块做异步IO处理。分层也是很清晰,将业务处理模块隔离到client_sup监控下的子进程,将网络处理细节和业务逻辑分离。在协议的解析和业务处理上虽然没有采用gen_fsm,但是也实现了一套类似的状态机机制,通过动态替换Callback来模拟状态的变迁,非常巧妙。如果你要实现一个tcp server,强烈推荐从rabbitmq中扣出这个网络层,你只需要实现自己的业务处理模块即可拥有一个高效、健壮、分层清晰的TCP服务器。
Techweb技术网友
Techweb技术网友  发表于 2009-12-15 14:13
好文章,赞一个1 \! C  N1 U6 G! Z
$ w( O2 b1 v3 w2 C. h+ U: {
其实在erlang driver里面accept操作肯定是async的,或者说是nonblocking的,无论你调用的是accept还是async_accept,这两者的唯一区别,只是accept在prim_inet模块里帮你receive {inet_async,...}这个消息,而async_accept在prim_inet里是通过port_control后直接就返回了
Techweb技术网友
Techweb技术网友  发表于 2009-12-15 14:13
写的不错。。。。, rabbitmq整个细节都做的非常好,不仅仅是网络这块。。。
Techweb技术网友
Techweb技术网友  发表于 2009-12-15 14:13
dennis大大啊,小弟刚学rabbitmq,请问如何通过http的方式来访问rabbitmq-server,还有如何在rabbitmq-server上注册一个context,就像这样访问http://localhost:5672/xxx?xxxxxxxxxx
. i( `1 r" J& |* [* h" S4 v, @, ?% E! U1 b
我反遍整个网络都没人介绍,YM。。。。
Techweb技术网友
Techweb技术网友  发表于 2009-12-15 14:13
你要http接口?自己做个proxy吧,找个好用的amqp客户端,把那些操作按url挂接到一个webserver上就行了,另外rabbitmq的erlang客户端貌似不太好用,hg clone下来的一链接就挂起,来的看后来自己写了个(比较粗糙,就算在一个vm里也要走tcp连接)……
Techweb技术网友
Techweb技术网友  发表于 2009-12-15 14:13
其实我现在想做的是http的长连接,我这里有个java client往rabbitmq publish数据,然后rabbitmq收到数据能推送到browser。如果browser端挂到某个webserver,那它负载太大了。。最好browser端能通过ajax长轮询方式直接挂到rabbitmq,最大限度利用erlang的并行特性
Techweb技术网友
Techweb技术网友  发表于 2009-12-15 14:14
我对rabbitmq的了解不必你多,除了稍微接触点源码外,对使用功能并没有做更多探索,暂时帮不上忙。看了后面的讨论,如果rabbitmq本身没有支持web访问的话,也许挂个mochiweb在前端代理可以解决。
Techweb技术网友
Techweb技术网友  发表于 2009-12-15 14:14
那就把mochiweb之类的跟rabbitmq整一块,中间搭个桥啊,这个桥一方面是amqp客户端,一方面是mochiweb里的程序,还可以把这些都跑到同一个erlang vm里,我现在有个类似的,不过不是对接的mochiweb之类的http server...
Techweb技术网友
Techweb技术网友  发表于 2009-12-15 14:14
用ejabberd吧,里面提供了http方式长连接,或者你参考他的实现。
您需要登录后才可以回帖 登录 | 注册

本版积分规则

© 2001-2014Comsenz Inc.

快速回复 返回顶部 返回列表