前言
介绍
[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是对标准socket接口的扩展。它提供了一种异步消息队列,多消息模式,消息过滤(订阅),对多种传输协议的无缝访问。 当前有2个版本正在维护,版本3最新版为3.3.4,版本4最新版本为4.0.1。本文档是对4.0.1分支代码进行分析。
目的
对NetMQ的源码进行学习并分析理解,因此写下该系列文章,本系列文章暂定编写计划如下:
消息队列NetMQ 原理分析5-Engine,Encord和Decord
消息队列NetMQ 原理分析6-TCP和Inpoc实现
消息队列NetMQ 原理分析7-Device
消息队列NetMQ 原理分析8-不同类型的Socket
消息队列NetMQ 原理分析9-实战
友情提示: 看本系列文章时最好获取源码,更有助于理解。
Socket
上一章最后我们简单介绍了SocketBase
和SessionBase
的创建和回收,这一张我们详细介绍SocketBase
和SessionBase
。
首先SocketBase
继承自Own
,即也是ZObject
对象,同时由于SocketBase
需要进行消息的传输,因此它实现了一些结构,包括IPollEvents
、Pipe.IPipeEvents
。
接口实现
internal abstract class SocketBase : Own, IPollEvents, Pipe.IPipeEvents{ ... }
IPollEvents
事件上一章回收线程已经介绍过,这里不再做过多说明了,简单讲SocketBase
实现该事件只有在回收线程回收Socket
的时候会触发。Pipe.IPipeEvents
:是管道事件,它的签名如下public interface IPipeEvents{void ReadActivated([NotNull] Pipe pipe);void WriteActivated([NotNull] Pipe pipe);void Hiccuped([NotNull] Pipe pipe);void Terminated([NotNull] Pipe pipe); }
ReadActivated
:表示管道可读,管道实际调用SocketBase
或SessionBase
的ReadActivated
方法,而SocketBase
实际会调用XReadActivated
方法。WriteActivated
:表示管道可写,管道实际调用SocketBase
或SessionBase
的WriteActivated
方法,而SocketBase
实际会调用XWriteActivated
方法。Hiccuped
:当连接突然中断时会调用此方法。WriteActivated
:表示管道终止。
内部结构
SocketBase
的内部维护着一个字段,用于存放连接/绑定地址和它的管道(若当前SocketBase
是TCPListener
,则无需初始化管道,管道为空)。
private readonly Dictionary m_endpoints = new Dictionary();private readonly Dictionary m_inprocs = new Dictionary();
Endpoint
对象用于存放SessionBase
和Pipe
或Listener
的引用
private class Endpoint{ public Endpoint(Own own, Pipe pipe) { Own = own; Pipe = pipe; } public Own Own { get; } public Pipe Pipe { get; } }
当SocketBase
连接或绑定最后会向将Endpoint
保存到字典中
private void AddEndpoint([NotNull] string address, [NotNull] Own endpoint, Pipe pipe){ LaunchChild(endpoint); m_endpoints[address] = new Endpoint(endpoint, pipe); }
在SocketBase
断开连接时会移除它
public void TermEndpoint([NotNull] string addr){ ... if (protocol == Address.InProcProtocol) { ... m_inprocs.Remove(addr); } else { ... m_endpoints.Remove(addr); } }
m_inprocs
也是一个字典用于存放inproc
协议的连接。
在第一章创建SocketBase我们介绍了Context
创建SocketBase
所做的一些工作,初始化SocketBase
时,会创建MailBox,用于传输Command
。
protected SocketBase([NotNull] Ctx parent, int threadId, int socketId) : base(parent, threadId){ m_options.SocketId = socketId; m_mailbox = new Mailbox("socket-" + socketId); }
每个
SocketBase
的命令处理实际都是在工作线程中进行。因此理论上(忽略线程上下文切换时造成的性能损失)线程数越多,NetMQ
的IO吞吐量和工作线程数成正比关系。
在Context
创建SocketBase
会根据Create
静态方法根据不同类型创建不同的SocketBase
public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int threadId, int socketId) { switch (type) { case ZmqSocketType.Pair: return new Pair(parent, threadId, socketId); case ZmqSocketType.Pub: return new Pub(parent, threadId, socketId); case ZmqSocketType.Sub: return new Sub(parent, threadId, socketId); case ZmqSocketType.Req: return new Req(parent, threadId, socketId); case ZmqSocketType.Rep: return new Rep(parent, threadId, socketId); case ZmqSocketType.Dealer: return new Dealer(parent, threadId, socketId); case ZmqSocketType.Router: return new Router(parent, threadId, socketId); case ZmqSocketType.Pull: return new Pull(parent, threadId, socketId); case ZmqSocketType.Push: return new Push(parent, threadId, socketId); case ZmqSocketType.Xpub: return new XPub(parent, threadId, socketId); case ZmqSocketType.Xsub: return new XSub(parent, threadId, socketId); case ZmqSocketType.Stream: return new Stream(parent, threadId, socketId); default: throw new InvalidException("SocketBase.Create called with invalid type of " + type); } }
具体创建SocketBase
的工作在上一章已经做了详细的介绍,这里不再复述。
Session
首先和SocketBase
一样,SessionBase
也继承自Own
,即也是ZObject
对象,同时由于SessionBase
和SocketBase
存在消息传输,所以它也实现了IPipeEvents
接口,同时它实现了IProactorEvents
接口,在消息收发是会接收到通知。SessionBase
一端和SocketBase
进行消息的通讯,另一端和Engine
存在消息通讯,它实现了IMsgSink
和IMsgSource
接口和Engine
进行消息传输。
internal class SessionBase : Own, Pipe.IPipeEvents, IProactorEvents, IMsgSink, IMsgSource{ }
internal interface IMsgSink{ /// /// 传输消息.成功时返回true. /// /// 将msg消息写入到管道中 bool PushMsg(ref Msg msg); }
internal interface IMsgSource{ /// /// 取一个消息。成功时返回,从管道获取消息写入msg参数中;若失败则返回false,将null写入到msg参数中。 /// /// 从管道获取消息写入Msg中 /// true if successful - and writes the message to the msg argument bool PullMsg(ref Msg msg); }
当
SocketBase
将消息写入到写管道时,对应的SessionBase
会从读管道读到SocketBase
写入的数据,然后将数据从管道取出生成一个Msg
,Engine
会和AsyncSocket
交互传输数据,关于Engine
下一章再做介绍。
Option
option
参数如下
Affinity
表示哪个线程是可用的,默认为0,表示所有线程在负载均衡都可使用。Backlog
最大Socket
待连接数DelayAttachOnConnect
在创建连接时,延迟在Socket
和Session
之间创建双向的管道,默认创建连接时立即创建管道DelayOnClose
若为true
,则在Socket
关闭时Session
先从管道接收所有消息发送出去。
否则直接关闭,默认为true
。DelayOnDisconnect
若为true
,则在Pipe
通知我们中断时Socket
先将接收所有入队管道消息。
否则直接中断管道。默认为true
.Endianness
字节序,数据在内存中是高到低排还是低到高排。Identity
响应的Identity
,每个Identity
用于查找Socket
。Identiy
是一个重复的随机32位整形数字,转换为字节5位字节数组。每个消息的第一部分是Identity
,IdentitySize
1个字节用于保存Identity的长度。IPv4Only
Linger
当Socket关闭时,是否延迟一段时间等待数据发送完毕后再关闭管道MaxMessageSize
每个消息包最大消息大小RawSocket
若设置为true,RouterSocket
可以接收非NetMQ
发送来的tcp
连接。
默认是false,Stream
在构造函数时会设置为true
,设置为true
时会将RecvIdentity
修改为false
(用NetMQ
接收其他系统发送来的Socket
请求应该用StreamSocekt
,否则由于应用层协议不一样可能会导致一些问题。)RecvIdentity
若为true,Identity
转发给Socket
。ReconnectIvl
设置最小重连时间间隔,单位ms。默认100msReconnectIvlMax
设置最大重连时间间隔,单位ms。默认0(无用)RecoveryIvl
PgmSocket
用的SendBuffer
发送缓存大小,设置底层传输Socket
的发送缓存大小,初始为0ReceiveBuffer
接收缓存大小,设置底层传输Socket
的接收缓存大小,初始为0SendHighWatermark
Socket
发送的管道的最大消息数,当发送水位达到最大时会阻塞发送。ReceiveHighWatermark
Socket
接收管道的最大消息数SendLowWatermark
Socket
发送低水位,消息的最小数量单位,每次达到多少消息数量才向Session管道才激活写事件。默认1000ReceiveLowWatermark
Socket
接收低水位,消息的最小数量单位,每次达到多少消息数量Session
管道才激活读事件。默认1000SendTimeout
Socket
发送操作超时时间TcpKeepalive
TCP保持连接设置,默认-1不修改配置TcpKeepaliveIdle
TCP心跳包在空闲时的时间间隔,默认-1不修改配置TcpKeepaliveIntvl
TCP心跳包时间间隔,默认-1不修改配置DisableTimeWait
客户端断开连接时禁用TIME_WAIT
TCP状态
Pipe
在上一章我们讲到过在SocketBase
和SessionBase
是通过2条单向管道进行消息传输,传输的消息单位是Msg
,消息管道是YPipe
类型,那么YPipe<>
又是什么呢?
YPipe
Ypipe
内部实际维护这一个YQueue
类型的先进先出队列,YPipe
向外暴露了一下方法:
TryRead
该方法用于判断当前队列是否可读,可读的话第一个对象出队public bool TryRead(out T value){if (!CheckRead()) { value = default(T); return false; }value = m_queue.Pop();return true; }
Unwrite
取消写入消息public bool Unwrite(ref T value){if (m_flushToIndex == m_queue.BackPos) return false;value = m_queue.Unpush();return true; }
写入消息
将消息写入到队列中,若写入未完成则当前消息的指针索引指向当前队列块的后一位。public void Write(ref T value, bool incomplete){ m_queue.Push(ref value);// Move the "flush up to here" pointer.if (!incomplete) { m_flushToIndex = m_queue.BackPos; } }
完成写入
当该部分消息写完时,则会调用Flush完成写入并通知另一个管道消息可读public void Flush(){if (m_state == State.Terminating) return;if (m_outboundPipe != null && !m_outboundPipe.Flush()) SendActivateRead(m_peer); }
Msg
写入的消息单位是Msg
,它实现了多条数据的存储,当每次数据写完还有数据带写入时通过将Flag标记为More
表示消息还没写入完。
YQueue
YQueue
是由一个个trunk
组成的,每个trunk
就是一个消息块,每个消息块可能包含多个Msg
,主要由写入消息时是否还有更多消息带写入(Flag
)决定。trunk
是一个双向循环链表,内部维护着一个数组用于存放数据,每个数据会有2个指针,分别指向前一个块和后一个块,每个块还有一个索引,表示当前块在队列中的位置。
private sealed class Chunk{ public Chunk(int size, int globalIndex) { Values = new T[size]; GlobalOffset = globalIndex; Debug.Assert(Values != null); } /// 数据 public T[] Values { get; } /// 当前块在队列中的位置 public int GlobalOffset { get; } /// 前一个块 [CanBeNull] public Chunk Previous { get; set; } /// 下一个块 [CanBeNull] public Chunk Next { get; set; } }
每个chunk
默认最多可保存256个部分。
由于每次向SocketBase
写入的Msg
可能有多个部分,因此消息会写入到数组中,所有消息写完后指向trunk
的指针才会后移一位。YQueue
有以下字段
//用于记录当前块消息的个数,默认为256private readonly int m_chunkSize;// 当队列是空的时,下一个块指向null,首尾块都指向初始化的一个块,开始位置的块仅用于队列的读取(front/pop),最后位置的仅用于队列的写入(back/push)。// 开始位置private volatile Chunk m_beginChunk;//chunk的当前可读位置索引private int m_beginPositionInChunk;//指向后一个块private Chunk m_backChunk;//chunk的最后一个可读位置索引private int m_backPositionInChunk;//指向后一个块private Chunk m_endChunk;//chunk的下一个可写位置索引private int m_endPosition;//当达到最大Msg数量时,扩展一个chunk,最大为256个块private Chunk m_spareChunk; 当前trunk头部在整个队列中的的索引位置private int m_nextGlobalIndex;
YPipe
写入Msg
实际是向YQueue
入队
public void Push(ref T val){ m_backChunk.Values[m_backPositionInChunk] = val; //指向后一个块 m_backChunk = m_endChunk; //索引更新到最后可读位置 m_backPositionInChunk = m_endPosition; //下一个可写位置向后移动一位 m_endPosition++; if (m_endPosition != m_chunkSize) return; //到达最后一个位置则需要扩充一个块 Chunk sc = m_spareChunk; if (sc != m_beginChunk) { //已经扩充了块则更新下一个块的位置 m_spareChunk = m_spareChunk.Next; m_endChunk.Next = sc; sc.Previous = m_endChunk; } else { //新建一个块,并更新索引位置 m_endChunk.Next = new Chunk(m_chunkSize, m_nextGlobalIndex); m_nextGlobalIndex += m_chunkSize; m_endChunk.Next.Previous = m_endCh
http://www.cnblogs.com/Jack-Blog/p/7117798.html