<svg width="80" height="80" viewBox="0 0 250 250" style="fill: #222; color: #fff; position: absolute; top: 0; border: 0; right: 0;" aria-hidden="true"><path d="M0,0 L115,115 L130,115 L142,142 L250,250 L250,0 Z"></path><path d="M128.3,109.0 C113.8,99.7 119.0,89.6 119.0,89.6 C122.0,82.7 120.5,78.6 120.5,78.6 C119.2,72.0 123.4,76.3 123.4,76.3 C127.3,80.9 125.5,87.3 125.5,87.3 C122.9,97.6 130.6,101.9 134.4,103.2" fill="currentColor" style="transform-origin: 130px 106px;" class="octo-arm"></path><path d="M115.0,115.0 C114.9,115.1 118.7,116.5 119.8,115.4 L133.7,101.6 C136.9,99.2 139.9,98.4 142.2,98.6 C133.8,88.0 127.5,74.4 143.8,58.0 C148.5,53.4 154.0,51.2 159.7,51.0 C160.3,49.4 163.2,43.6 171.4,40.1 C171.4,40.1 176.1,42.5 178.8,56.2 C183.1,58.6 187.2,61.8 190.9,65.4 C194.5,69.0 197.7,73.2 200.1,77.6 C213.8,80.2 216.3,84.9 216.3,84.9 C212.7,93.1 206.9,96.0 205.4,96.6 C205.1,102.4 203.0,107.8 198.3,112.5 C181.9,128.9 168.3,122.5 157.7,114.1 C157.9,116.9 156.7,120.9 152.7,124.9 L141.0,136.5 C139.8,137.7 141.6,141.9 141.8,141.8 Z" fill="currentColor" class="octo-body"></path></svg>

消息队列设计思路

ActiveMQ、RabbitMQ、Kafka等都是大家耳熟能详的消息队列,了解消息队列的原理,我们也可以尝试构建一个自己的消息队列。

从消息队列的功能上思考 - 生产者创建一条消息流,通过http或tcp协议传输到消息队列中;消息队列对数据流进行处理,在适当的时机发送给合适的消费者;消费者接收处理消息后,将处理结果反馈给消息队列服务。简而言之,消息队列作为中间人,为生产者和消费者架起数据流传输的桥梁,使生产者和消费者之间无需关注自己在和谁交换数据,整个过程都是异步的。

向后走一步,我们会发现必须面对生产者、消费者、消息队列之间的关系如何管理的问题。在分布式服务中,生产者与消费者之间通常是多对多的关系,为确保消息的正确投递,消息队列可以无需感知生产者的存在,但必须感知消费者的存在,否则生产者必须在每个消息头部附带消费者的地址信息,很大程度地削减了消息队列的作用。

通过这个场景预设,我们就可以确定一个基本的消息队列需要实现哪些功能:

  • 消息接收
  • 消息转发
  • 消费者确认
  • 消费者注册

消息通讯参与者

从功能上思考

消息队列的功能思考-脑图

消息接收、转发与确认

MQ需要开放一个服务用于接收消息,接收到消息并不会马上转发出去,本地承载消息的堆积、错峰限流以及更多的处理,这才体现出消息队列的作用。在消息的承载堆积上,考虑到持久化和效率,可以选用Redis作为缓存,(许多简易的消息队列甚至可以利用Redis的List实现)。消息队列接收到生产者的消息时,MQ需要知道消息从何而来去往何处、消息内容是否重复,此时就需要统一地包装消息内容。根据实现的不同,消息的格式也不尽相同。

消息到达MQ时,MQ将其缓存到本地,然后经过处理后,使用队列投递到消费者。在这个过程中,需要思考如何保证消息的唯一性,以及如何确保消息投递给正确的消费者:
1) 消息的唯一性可以通过全局唯一的messageId保证,生成全局唯一的Id有很多方法,比如UUID、ObjectId
2) 消息投递给正确的消费者,需要将消息和消费者关联起来。消息和消费者通过消费者当然可以通过IP地址进行关联(类似RPC的调用),但这样的话在容错和负载均衡上就存在短板,我们可以通过applicationId进行关联,每个消费者注册到不同的application中,消息队列通过消息实体的附带的applicationId将消息转发给不同的消费者

消息投递给消费者的两种模型分别为pull与push,关于这两种模型,可以通过Git的分支更新理解:
假如你有一个Repository克隆到本地,你想和Server上的代码保持同步,但你又不知道Server上的代码何时更新,你可以通过脚本在固定的周期下拉取远程代码(git pull),比如每隔5分钟更新一次,这就是Pull模型;固定周期地pull,就会造成请求的忙等或更新延迟,你可以通过服务端的钩子(hook),代码变更时推送(Push)通知及时更新,这种就是Push模型。在消息队列的设计中,多使用的是Push模型,至于满消费造成的消息堆积等问题,就要根据应用场景斟酌了。

消费者确认是消息队列投递的过程中很重要的一步,因为消息队列必须确保消息的成功落地。当消费者成功地ack,消息队列才能确定消息投递成功,进而将本地的缓存删除。根据消息处理逻辑的不同,消费者处理不同消息所需的时间也不同,所以MQ应当允许消费者主动确认。在这种情况下,我们可以将消息到达消费者的反馈和消息确认的反馈分开,MQ确认消息投递到消费者后,可以先等待一段时间避免消费者确认时频繁建立连接;也可以放心地断开连接等待消费者主动地 ack 或 reject 等动作。

消费者注册

就在消息接收和转发中提到的,消费者可以注册到不同的application中,这样消费者之间就通过application产生隔离,并且消息实体中也无需显式地指定目标地址,所有的消费者地址由消息队列管理并定期进行心跳检测。

消息通讯中,各个参与者交互的时序图如下:

消息通讯的时序图

设计实现的思路

设想一个消息发送与转发的过程:当连接建立时,客户端发送给MQ的字节流被解码成消息实体,通过通道传递给正在等待的其它handler处理,当handler处理完成之后,又通过通道传递给下一个handler,这个过程中消息会经历编码、缓存、进入队列、投递给消费者等一系列过程。

上述的流程,可以拆分为四个通道分别处理,每个通道独立工作,直到连接关闭或消息队列停止服务:

  • 消息连接通道: 负责接收生产者、消费者发过来的消息流
  • 消息解码通道: 负责将消息流解析为格式化的消息实体对象
  • 消息处理通道: 根据消息类型、消息ID对消息进行处理、并产生一个消息处理结果,比如消息缓存成功、消费者注册成功等
  • 消息回复通道: 负责将消息处理结果编码,通过相应的通信协议发送(回复)给接收方

消息接收与投递

除此之外,我们需要两个独立的worker:

  • 消息入队列: 将消息(ID)添加到待投递队列中
  • 消息出队列: 从待投递队列、待重传队列、死信队列中取出消息ID,根据消息状态做投递、添加到重试队列等处理动作

传输协议与消息格式

MQ与生产者、消费者之间的通信可以基于TCP协议,由于消息大小不肯能统一,必须解决TCP粘包问题。TCP粘包的一种典型处理办法是使用特殊组发分隔,这里可以模仿HTTP服务的响应头,使用两个换行符\r\n间隔一条参数,参数的key和value之间使用=间隔,使用四个换行\r\n\r\n符间隔一段消息。

为了完整地包含消息实体所附带的信息,可以定义如下属性作为消息的统一格式:

参数 类型 描述
appid string 应用的ID,SKMQ会根据应用ID将消息投递给不同的消费者
msgid string 消息ID,必须保证ID的唯一性,SKMQ会过滤重复的消息
type string 消息类型,生产者发送消息、消息队列投递消息、消费者反馈都会附带相应的消息类型
content byte 消息内容,SKMQ会将它转发给相应的消费者

消息的 type 属性

消息类型包含了生产者生产的消息类型、消费者反馈的消息类型以及消息队列返回的消息类型,以下字段基本可用涵盖所有的消息类型:

  • topic 生产者要发送的消息类型 - 应用内广播

  • queue 生产中要发送的消息类型 - 点对点单播

  • push MQ推送消息时的默认类型

  • resp MQ所有的返回信息类型均为resp,此时的返回内容为json text,状态包含在返回内容中

    1
    2
    3
    4
    {
    "Status": "ack | reject | error",
    "Content": "response msg"
    }
  • arrived MQ投递消息时,消费者在接收到消息时需先发送arrived类型的消息,在确保消息落地时才发送ack消息

  • ack 对消息队列的正常响应信号

  • reject 当消费者无法消费时,发送一个reject消息给消息队列,消息队列收到reject时,会将消息投入到待重传队列重新排队

  • error 消息处理失败的标志,消息同样会进入待重传队列

  • register 注册收件人(消费者),此时消息的content应该为合法的json text,否则无法完成注册。

    1
    2
    3
    4
    5
    6
    7
    {
    "id": "recipient id",
    "app_id": "application id",
    "host": "",
    "port":"",
    "weight": 0
    }

消息投递与接收

  • 消息投递
    这里的消息投递时,使用的是Push模型,所以消费者节点需要维持一个开放的侦听端口,随时等候MQ的消息投递。
    消费者注册、生产者推送消息、消息投递、消费者响应等消息内容,全都是以上述相同的内容编码规则进行传递。

  • ack
    消息到达消费者节点时,消费者先发送一个arrived类型的消息告知MQ信件已经到达,MQ会等待一段时间,
    在这段时间内,如果消费者很快完成任务处理,可以立即发送ack响应告知MQ消息已经处理完成;若未完成,
    MQ会关闭连接,消费者可以随后主动发送ack消息告知MQ消息已处理完成

1
2
3
4
5
6
7
delivery: MQ -> msgid=id_xxx\r\ntype=push\r\ncontent=bytes_xxx\r\n\r\n -> Consumer

process1:
MQ -> wait -> Consumer -> process -> Consumer -> msgid=id_xxx\r\ntype=ack\r\n\r\n -> MQ
process2:
1) Consumer -> process; MQ -> connection closed;
2) Consumer -> msgid=id_xxx\r\ntype=ack\r\n\r\n -> MQ

心跳检测

为了消息投递的稳定性,MQ会定期监测消费者节点,通过心跳包检查节点是否失联并及时将其标记,不参与下次消息接收,所以消息节点需要开放侦听端口,用于接收和反馈心跳包: 只需在收到内容为 ping\r\n\r\n的数据时,返回一个pong\r\n\r\n的数据包

1
2
3
4
5
6
7
8
9
   connect.SetWriteDeadline(time.Now().Add(ConnectTimeOut))
err := SendMessage(connect, []byte(PING))
if err != nil {
return false
}

buf := make([]byte, 10)
connect.SetReadDeadline(time.Now().Add(ConnectTimeOut))
read, err := connect.Read(buf)

BTW

以上为设计消息队列的一些思路,利用Golang在goroutine + channel上的优势,可以很简洁地实现这些行为,
比如下面链接的Github仓库就是我使用Go实现的基于Redis的消息队列:

SKMQ,轻量的可限流队列