前言

介绍

[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是对标准socket接口的扩展。它提供了一种异步消息队列,多消息模式,消息过滤(订阅),对多种传输协议的无缝访问。
当前有2个版本正在维护,版本3最新版为3.3.4,版本4最新版本为4.0.1。本文档是对4.0.1分支代码进行分析。

zeromq的英文文档
NetMQ的英文文档

目的

对NetMQ的源码进行学习并分析理解,因此写下该系列文章,本系列文章暂定编写计划如下:

  1. 消息队列NetMQ 原理分析1-Context和ZObject

  2. 消息队列NetMQ 原理分析2-IO线程和完成端口

  3. 消息队列NetMQ 原理分析3-命令产生/处理、创建Socket和回收线程

  4. 消息队列NetMQ 原理分析4-Session、Option和Pipe

  5. 消息队列NetMQ 原理分析5-Engine

  6. 消息队列NetMQ 原理分析6-TCP和Inpoc实现

  7. 消息队列NetMQ 原理分析7-Device

  8. 消息队列NetMQ 原理分析8-不同类型的Socket

  9. 消息队列NetMQ 原理分析9-实战

友情提示: 看本系列文章时最好获取源码,更有助于理解。


命令

命令结构

Command定义如下

internal struct Command
{    public Command([CanBeNull] ZObject destination, CommandType type, [CanBeNull] object arg = null) : this()    {
        Destination = destination;
        CommandType = type;
        Arg = arg;
    }
    [CanBeNull]    public ZObject Destination { get; }    public CommandType CommandType { get; }
    [CanBeNull]    public object Arg { get; private set; }        
    public override string ToString()    {        return base.ToString() + "[" + CommandType + ", " + Destination + "]";
    }
}

其包含了3个信息:调用者,命令类型和命令参数。

命令产生

还记的《消息队列NetMQ 原理分析1-Context和ZObject》中我们介绍过NetMQ中的命令类型吗?待处理命令全部会存放着Socket的信箱中。当Socket有命令(连接完成、发送完成或接受完成等)需要处理时调用基类ZObjectSendCommand方法。

private void SendCommand([NotNull] Command cmd){
    m_ctx.SendCommand(cmd.Destination.ThreadId, cmd);
}

万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

ZObject实际调用Context的SendCommand方法

public void SendCommand(int threadId, [NotNull] Command command){
    m_slots[threadId].Send(command);
}

m_slots[threadId]保存的是当前IO线程的IO信箱IOThreadMailbox,在《消息队列NetMQ 原理分析2-IO线程和完成端口》
我们简单介绍了IOThreadMailbox的结构。

[NotNull] private readonly YPipe<Command> m_commandPipe = new YPipe<Command>(Config.CommandPipeGranularity, "mailbox");

IOThreadMailbox中维护这一个Command管道,该管道实际就是一个先进先出队列,详细解析会在第四章进行介绍。

public void Send(Command command){    bool ok;    lock (m_sync)
    {        //向管道写入命令
        m_commandPipe.Write(ref command, false);        //成功写入会返回false,表示有命令需要处理
        ok = m_commandPipe.Flush();
    }    if (!ok)
    {        //向完成端口传递信号
        m_proactor.SignalMailbox(this);
    }
}public bool TryRecv(out Command command){    return m_commandPipe.TryRead(out command);
}public void RaiseEvent(){    if (!m_disposed)
    {
        m_mailboxEvent.Ready();
    }
}

IOThreadMailbox的主要就是这三个方法

  1. 当有命令来的时候调用Send方法向管道(队列)写入命令。写完时,会向完成端口传递信号。

  2. 当有命令需要处理时调用TryRecv方法读取

  3. 当完成端口接收到信号需要命令处理时,调用RaiseEvent(实际是信箱的IO线程的RaiseEvent方法)进行处理命令。

public void SignalMailbox(IOThreadMailbox mailbox){    //该方法会向完成端口的队列中插入一个信号状态
    m_completionPort.Signal(mailbox);
}

有关于完成端口介绍请查看《消息队列NetMQ 原理分析2-IO线程和完成端口》

命令处理

当有命令需要处理时,完成端口会接收到信号。

private void Loop(){
    ...    int timeout = ExecuteTimers();    int removed;    if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))        continue;    for (int i = 0; i < removed; i++)
    {        try
        {            if (completionStatuses[i].OperationType == OperationType.Signal)
            {                var mailbox = (IOThreadMailbox)completionStatuses[i].State;
                mailbox.RaiseEvent();
            }
            ...
        }
        ...
    }
    ...
}

在线程轮询方法Loop中,当接收到需要处理的数据时,首先会判断是否是信号,若为信号,则将状态(参数)转化为IOThreadMailbox类型,同时调用RaiseEvent方法处理命令。

public void Ready(){
    Command command;    while (m_mailbox.TryRecv(out command))
        command.Destination.ProcessCommand(command);
}

当有命令需要处理时,会调用IOThreadMailboxTryRecv方法从管道(队列,先进先出)中获取第一个命令进行处理。

创建Socket(SocketBase)

在介绍回收线程工作之前,我们先看下创建一个新的Socket做了哪些工作,这里的Socket实际是NetMQ中的SocketBase

RequestSocket socket = new RequestSocket();socket.Connect("tcp://127.0.0.1:12345");

NetMQSocket是NetMQ的Socket的基类。

public RequestSocket(string connectionString = null) : base(ZmqSocketType.Req, connectionString, DefaultAction.Connect){

}
internal NetMQSocket(ZmqSocketType socketType, string connectionString, DefaultAction defaultAction){
    m_socketHandle = NetMQConfig.Context.CreateSocket(socketType);
    m_netMqSelector = new NetMQSelector();
    Options = new SocketOptions(this);
    m_socketEventArgs = new NetMQSocketEventArgs(this);

    Options.Linger = NetMQConfig.Linger;    if (!string.IsNullOrEmpty(connectionString))
    {        var endpoints =
            connectionString.Split(new[] {','}, StringSplitOptions.RemoveEmptyEntries)
                .Select(a => a.Trim()).Where(a=> !string.IsNullOrEmpty(a));        foreach (string endpoint in endpoints)
        {            if (endpoint[0] == '@')
            {
                Bind(endpoint.Substring(1));
            }            else if (endpoint[0] == '>')            {
                Connect(endpoint.Substring(1));
            }            else if (defaultAction == DefaultAction.Connect)            {
                Connect(endpoint);
            }            else
            {
                Bind(endpoint);
            }
        }
    }
}

首先会根据Socket的类型创建对应的Socket,调用的是ContextCreateSocket方法。具体的请看创建SocketBase。最终创建方法是调用SocketBaseCreate方法

public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int threadId, int socketId){    switch (type)
    {
        ...        case ZmqSocketType.Req:            return new Req(parent, threadId, socketId);
        ...        default:            throw new InvalidException("SocketBase.Create called with invalid type of " + type);
    }
}

创建完后,就对地址进行解析。若有多个地址,则可用,分隔。

var endpoints =
connectionString.Split(new[] {','}, StringSplitOptions.RemoveEmptyEntries)
    .Select(a => a.Trim()).Where(a=> !string.IsNullOrEmpty(a));

解析完成后则用默认的方式进行绑定或连接,如RequestSocket默认为连接,而ResponseSocket则为绑定。

创建连接

  1. 首先对地址进行解析,判断当前是tcp还是其他协议。然后会根据协议类型创建对应的Socket,具体的协议类型分析请查看《消息队列NetMQ 原理分析6-TCP和Inpoc实现》

    private static void DecodeAddress([NotNull] string addr, out string address, out string protocol){    const string protocolDelimeter = "://";    int protocolDelimeterIndex = addr.IndexOf(protocolDelimeter, StringComparison.Ordinal);
    
        protocol = addr.Substring(0, protocolDelimeterIndex);
        address = addr.Substring(protocolDelimeterIndex + protocolDelimeter.Length);
    }
  2. 负载均衡选择一个IO线程。

  3. 创建Session,SocketSession的关系如图所示
    万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训

  4. 创建管道,创建管道会创建一对单向管道,形成“一个”双向管道。头尾分别连接SocketSession,如上图所示。创建管道完毕后需要设置管道的回调事件,管道1设置回调为Socket的回调方法,管道2设置为Session的回调方法。

具体关于SessionPipe的内容请查看《消息队列NetMQ 原理分析4-Session、Option和Pipe》

  1. 处理SocketSession的关系

protected void LaunchChild([NotNull] Own obj){    // Specify the owner of the object.
    obj.SetOwner(this);    // Plug the object into the I/O thread.
    SendPlug(obj);    // Take ownership of the object.
    SendOwn(this, obj);
}
  • Session的宿主设置为该Socket

    private void SetOwner([NotNull] Own owner){
    Debug.Assert(m_owner == null);
    m_owner = owner;
    }
  • 为IO对象设置Session,当管道有数据交互时,Session的回调方法就会触发。

    protected void SendPlug([NotNull] Own destination, bool incSeqnum = true){if (incSeqnum)
        destination.IncSeqnum();
    SendCommand(new Command(destination, CommandType.Plug));
    }

    SessionBaseProcessPlug会被触发

    protected override void ProcessPlug(){
    m_ioObject.SetHandler(this);if (m_connect)
        StartConnecting(false);
    }
  • 将当前Session加入到SocketSession集合中,

    protected void SendOwn([NotNull] Own destination, [NotNull] Own obj){
    destination.IncSeqnum();
    SendCommand(new Command(destination, CommandType.Own, obj));
    }

    SocketBase的父类方法SendOwn(Own方法)方法会被触发,将Session加入到集合中

    protected override void ProcessOwn(Own obj){
    ...// Store the reference to the owned object.m_owned.Add(obj);
    }

    创建绑定

  1. 首先对地址进行解析,判断当前是tcp还是其他协议。然后会根据协议类型创建对应的Socket,具体的协议类型分析请查看《消息队列NetMQ 原理分析6-TCP和Inpoc实现》

    private static void DecodeAddress([NotNull] string addr, out string address, out string protocol){    const string protocolDelimeter = "://";    int protocolDelimeterIndex = addr.IndexOf(protocolDelimeter, StringComparison.Ordinal);
    
        protocol = addr.Substring(0, protocolDelimeterIndex);
        address = addr.Substring(protocolDelimeterIndex + protocolDelimeter.Length);
    }
  2. 负载均衡选择一个IO线程。

  3. 处理SocketSession的关系

protected void LaunchChild([NotNull] Own obj){    // Specify the owner of the object.
    obj.SetOwner(this);    // Plug the object into the I/O thread.
    SendPlug(obj);    // Take ownership of the object.
    SendOwn(this, obj);
}
  • Listener的宿主设置为该Socket

    private void SetOwner([NotNull] Own owner){
    Debug.Assert(m_owner == null);
    m_owner = owner;
    }
  • 为IO对象设置Listener,当管道有数据交互是,Listener的回调方法就会触发。

    protected void SendPlug([NotNull] Own destination, bool incSeqnum = true){if (incSeqnum)
        destination.IncSeqnum();
    SendCommand(new Command(destination, CommandType.Plug));
    }

    ListenerProcessPlug会被触发

    protected override void ProcessPlug(){
    m_ioObject.SetHandler(this);
    m_ioObject.AddSocket(m_handle);//接收异步socketAccept();
    }
  • 将当前Listener加入到SocketListener集合中,

    protected void SendOwn([NotNull] Own destination, [NotNull] Own obj){
    destination.IncSeqnum();
    SendCommand(new Command(destination, CommandType.Own, obj));
    }

    SocketBase的父类方法SendOwn(Own方法)方法会被触发,将Listener加入到集合中

    protected override void ProcessOwn(Own obj){
    ...// Store the reference to the owned object.m_owned.Add(obj);
    }

    SocketBase的创建处理就完成了

回收线程

(垃圾)回收线程是专门处理(清理)异步关闭的Socket的线程,它在NetMQ中起到至关重要的作用。

internal class Reaper : ZObject, IPollEvents{
   ... 
}

Reaper是一个ZObject对象,同时实现了IPollEvents接口,该接口的作用是当有信息接收或发送时进行处理。回收线程实现了InEvent方法。

internal interface IPollEvents : ITimerEvent{    void InEvent();    void OutEvent();
}

InEvent方法实现和IO线程的Ready方法很像,都是遍历需要处理的命令进行处理。

public void InEvent(){    while (true)
    {
        Command command;        if (!m_mailbox.TryRecv(0, out command))            break;
        command.Destination.ProcessCommand(command);
    }
}

初始化回收线程

public Reaper([NotNull] Ctx ctx, int threadId)
    : base(ctx, threadId){
    m_sockets = 0;
    m_terminating = false;    string name = "reaper-" + threadId;
    m_poller = new Utils.Poller(name);

    m_mailbox = new Mailbox(name);

    m_mailboxHandle = m_mailbox.Handle;
    m_poller.AddHandle(m_mailboxHandle, this);
    m_poller.SetPollIn(m_mailboxHandle);
}
  1. 初始化回收线程是会创建一个Poller对象,用于轮询回收SocketBase


}


http://www.cnblogs.com/Jack-Blog/p/6774902.html