RocketMQ

一.RocketMQ入门

1.特性

  • 原生分布式
  • 两种消息拉取
  • 严格消息顺序
  • 特有的分布式协调器
  • 亿级消息堆积

2.基本概念

  • producer

    消息生产者,负责产生消息,一般由业务系统负责产生消息

  • consumer

    消息消费者,负责消费消息,一般是后台系统负责异步消费

  • push consumer

    封装消息拉取,消费进程和内部

  • pull consumer

    主动拉取消息,一旦拉取到消息,应用的消费进程进行初始化

  • producer group

    一类producer的集合名称,这类producer通常发送一类消息,且发送逻辑一致

  • consumer group

    一类consumer的集合名称,这类consumer通常消费一类消息,且消费逻辑一致

  • broker

    消息中转角色,负责存储消息,转发消息,这里就是rockermq server

  • topic

    消息的主题,用于定义并在服务端配置,消费者可以按主题进行订阅,也就是消息分类,通常一个系统一个topic

  • Message

    在生产者、消费者、服务器之间传递的消息,一个message必须属于一个topic

  • namesrv

    一个无状态的名称服务,可以集群部署,每一个broker启动的时候都会向名称服务器注册,主要

    是接收broker的注册,接收客户端的路由请求并发挥路由信息

  • offset

    偏移量,消费者拉取消息时需要知道上一次消费到了什么位置,这一次从哪里开始

  • partition

    • 分区,topic物理上的分组,一个topic可以分为多个分区,每个分区是一个有序队列。

    分区中的消息都会给分配一个有序的id,也就是偏移量

  • Tag

    用于对消息进行过滤,理解为message的标记,同一业务不同目的的message可以用相同的topic但是可以用不同的tag来区分

  • key

    消息的key字段是为了唯一表示消息的,为了方便查阅,不是说必须设置,只是说设置了方便开发和运维定位问题。比如:这个key可以是订单的id等。

3.安装运行

3.1环境准备

centos7、jdk8

3.2下载rocketmq

官网下载 wget

3.3运行

注意: 启动默认 nameserver: 4g broker:8g 需要调整

nameserver默认占用内存配置在:/rocketmq/bin/runserver.sh -Xms1g -Xmx1g

broker默认使用内存大小配置: /rocketmq/bin/runbroker.sh -Xms2g -Xmx2g

  • 启动nameserver:

              nohup sh mqnamesrv ~~-n "192.168.164.131:9876"~~> ../logs/namesrv.log 2>&1 &
    
              查看运行进程 ps -ef|grep NamesrvStartup              
  • 启动broker:

       nohup sh mqbroker ~~-n localhost:9876~~>../logs/broker.log 2>&1 &(并不完整)

    nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf >../logs/broker.log 2>&1 &

    指定加载配置文件

    查看broker是否运行 ps -ef|grep broker

  • 停止broker

       bin/mqshutdown broker
  • 停止nameserver

       bin/mqshutdown namesrv

    验证步骤:注意必须在同一个会话中

  • 发送消息

    export NAMESRV_ADDR=localhost:9876

    bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

  • 接收消息

    bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

    注意:默认会发送很多数据,注意_‘’

3.4可视化工具

3.4.1. git clone源码

git clone https://github.com/apache/rocketmq-externals

3.4.2. 切换版本

git checkout rocketmq-console-1.0.0

3.4.3. 编译为jar

mvn clean package -Dmaven.test.skip=true

3.4.4. 启动
# jar包在target目录下面,你可以放在一台服务器上面运行
java -jar rocketmq-console-ng-1.0.0.jar --server.port=8081--rocketmq.config.namesrvAddr=192.168.100.242:9876
# --server.port springboot内置tomcat的端口号,默认8080;
# --rocketmq.config.namesrvAddr  nameserver的地址

nohup java -jar rocketmq-console-ng-1.0.0.jar –server.port=8081–rocketmq.config.namesrvAddr=node1:9876>rocketmqConsole.log 2>&1 &

3.5应用

rocketmq的客户端:maven 依赖 org.apache.rocketmq rocketmq-client

3.5.1java应用运行出现问题

1.无法连接

gct09F.png

查看console控制台,集群部分,发现broker在namesrv上面注册地址和所在服务器实际地址不一致

gctXwr.png

*解决办法:broker端启动时,需要指定加载配置文件,并设置broker当前服务器ip

nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf >../logs/broker.log 2>&1 &

*参考:https://blog.csdn.net/huang_550/article/details/90693656

二.架构方案及角色详解

  • RocketMQ角色介绍
  • RocketMQ架构方案
  • RocketMQ集群部署配置

1.角色介绍

RocketMQ由四个角色组成:

  • producer:消息生产者
  • consumer:消费者
  • broker: MQ服务,负责接收、分发消息
  • nameserver:负责mq服务之间的协调

2.架构方案

gCoI6R.png

2.1Nameserver Cluster

  • 服务注册与发现
  • 路由管理

nameserver 提供轻量级服务发现与路由。

每个名称服务器记录完整的路由信息,提供相应的读写服务,并支持快速的存储扩展。

3.安装运行

下载安装包,解压

更改nameserver、broker默认内存配置

./bin/mqadmin 命令管理工具

./bin/mqadmin clusterlist -n

4.集群

在应用包下有双主双从同步、双主双从异步、主从配置文件例子

nameserver集群不互相同步数据,会保存所有broker信息

gcC5q8.png

配置说明1

gcC8Tg.png

配置说明2

gcCMtX.png

配置说明3

gcCO5G.png

三.有序消息

场景:同时存钱、取钱短信通知

1.有序消息

    有序消息又叫顺序消息(FIFO消息),是指消息的消费顺序和产生顺序相同,在有些业务逻辑下,必须保证顺序。比如订单的生成、付款、发货,这个消息必须按顺序处理才行。顺序消息分为**全局顺序**和**分区(queue)顺序**

1.1全局顺序

   一个topic内所有的消息都发布到同一个queue中,按照先进先出的顺序进行发布和消费。

  ***适用场景:***性能要求不高,所有的消息严格按照FIFO原则进行消息发布和消费的场景

1.2 分区顺序

  对于指定的一个Topic,所有的消息根据sharding key 进行区块(queue)分区,同一个queue内的消息按照严格的FIFO顺序进行发布和消费。

    sharding key 是顺序消息中用来区分不同分区的关键手段,和普通消息的key是完全不同的概念。

适用场景:性能要求高,根据消息中的sharding key去决定消费发送到哪一个queue。

1.3 全局顺序和分区顺序对比

  • 消息类型对比

    消息类型 支持事务消息 支持定时消息 性能
    无序消息(普通、事务、定时、延时消息) 最高
    分区顺序消息
    全局顺序消息 一般
  • 发送方式对比

    消息类型 支持可靠同步发送 支持可靠异步发送 支持oneway发送
    无序消息(普通、事务、定时、延时)
    分区顺序消息
    全局顺序消息

2.如何保证消息顺序

在MQ的模型中,顺序需要由3个阶段去保障:

  1. 消息被发送时保持顺序
  2. 消息被存储时保持和发送的顺序一致
  3. 消息被消费时保持和存储的顺序一致

3.RocketMQ顺序消息的实现

RocketMQ消费端有两种类型,MQPullConsumer和MQPushConsumer

本质上底层都是通过pull机制去实现,pushConsumer是一种API封装

3.1MQPullConsumer

     MQPullConsumer 由用户控制线程,主动从服务端获取消息,每次获取到的是一个MessageQueue中的消息。PullResult中的List<MessageExt> msgFoundList自然和存储顺序一致,用户需要再拿到这批消息后自己保证消费的顺序。

3.2MQPushConsumer

    MQPushConsumer由用户注册MessageListener来消费消息,在客户端中需要保证调用MessageListener时消息的顺序性。

4.有序消息的缺陷

    发送顺序消息无法利用集群的FailOver特性,因为不能更换MessageQueue进行重试,因为发送的路由策略导致的热点问题,可能某一些MessageQueue的数据量特别大
  • 消费的并行读依赖于queue数量
  • 消费失败时无法跳过

5.有序消息的使用

官网教程:http://rocketmq.apache.org/docs/order-example

四.订阅机制和定时消息

  • 发布订阅的基本概念
  • RocketMQ订阅模式实现原理
  • 使用订阅模式
  • 定时消息的基本概念
  • Broker定时消息发送逻辑
  • 使用定时消息

1.发布订阅

    发布订阅模式又叫观察者模式,它定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖它的对象都将得到通知。

2.RocketMQ的发布订阅

RocketMQ的消息订阅分为两种模式:

  • Push模式(MQPushConsumer):broker主动向消费者推送

  • Pull模式(MQPullConsumer):消费者在需要消息时,主动到broker拉取

    在rocketmq具体实现中,push和pull模式都是采用消费者主动从broker拉取消息。

3.定时消息

   定时消息是指消息发送到broker后,不能立刻被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。

  如果要支持任意的时间精度,在broker层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。

 RocketMQ支持定时消息,但是不支持任意时间精度,支持特定的level级别,例如,定时5s、10s、1m等。

使用定时消息,官方提供例子http://rocketmq.apache.org/docs/schedule-example/

五.批量消息和事务消息

1.批量消息

     在很多调优时,比如数据库批量处理,有些请求进行合并发送等都是类似批量的实现。RocketMQ批量发送也是为了追求性能,特别在消息数量特别大的时候,批量效果就非常明显。

1.1.使用批量消息的限制

  • 同一批次的消息应该具有相同主题、相同的消息配置
  • 不支持延迟消息
  • 建议一个批量消息大小最好不要超过1MB

官方实例:http://rocketmq.apache.org/docs/batch-example

2.事务消息

   RocketMQ的事务消息,是指Producer端消息发送事件和本地事务事件,同时成功或同时失败。

gcc7Aa.png

2.1事务消息的使用约束

  • 事务消息不支持定时和批量

  • 为了避免一个消息被多次检查,导致半数队列消息堆积,RocketMQ限制了单个消息的默认检查次数为15次,

    通过修改broker配置文件中的transactionCheckMax参数进行调整

  • 特定的时间段之后才检查事务,通过broker配置文件参数transactionTimeOut或用户配置CHECK_IMMUNITY_TIME_IN_SECONDS调整时间

  • 一个事务消息可能被检查或消费多次

  • 提交过的消息重新放到用户目标主题可能会失败

  • 事务消息的生产者ID不能与其他类型消息的生产者ID共享

2.2事务状态

三种事务消息状态:

TransactionStatus.CommitTransaction 提交事务,允许消费者消费这个消息

TransactionStatus.RollbackTransaction 回滚事务,消息将会被删除或不再允许消费

TransactionStatus.Unknown 中间状态,MQ需要重新检查来确定状态

六.RocketMQ中高性能最佳实践

1.最佳实践之Producer

  1. 一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。

    只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags在broker做消息过滤

  2. 每个消息在业务层面的唯一标识码,要设置到keys字段,方便将来定位消息丢失问题

    服务器会为每个消息创建索引(哈希索引),应用可以通过topic,key来查询这条消息内容,以及消息被谁消费。

    由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。

  3. 如有可靠性需要,消息发送成功或者失败,要打印消息日志(sendresule和key信息)

  4. 如果相同性质的消息量大,使用批量消息,可以提升性能

  5. 建议消息大小不超过512KB

  6. send(msg) 会阻塞,如果有性能要求,可以使用异步的方式:send(msg,callback)

  7. 如果在一个jvm中,有多个生产者进行大数据处理,建议:

    • 少数生产者使用异步发送方式(3~5个)
    • 通过setinstanceName方法,给每一个生产者设置一个实例名
  8. send消息方法,只要不抛出异常,就代表发送成功。但是发送成功会有多个状态,在sendresule里定义

    • SEND_OK:消息发送成功
    • FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
    • FLUSH_SLAVE_TIMEOUT : 消息发送成功,但是服务器同步到slave时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
    • SLAVE_NOT_AVAILABLE:消息发送成功,但是此时slave不可用,消息已经进入服务器队列,此时只有服务器宕机,消息才会丢失
    • 如果状态是FLUSH_DISK_TIMEOUT或FLUSH_SLAVE_TIMEOUT,并且broker正好关闭,此时可以丢失这条消息,或者重发。建议最好重发,由消费者去重
    • Producer向broker发送请求会等待响应,但如果达到最大等待时间,未得到响应,则客户端将抛出RemotingTimeoutException
    • 默认等待时间是3秒,如果使用send(msg,timeout),则可以自己设定超时时间,但超时时间不能设置太小,因为broker需要一些时间来刷新磁盘或与从属设备同步
    • 如果该值超过syncFlushTimeout ,则该值可能影响不大,因为broker可能会在超时之前返回FLUSH_SLAVE_TIMEOUT或FLUSH_SLAVE_TIMEOUT的响应
  9. 对于消息不可丢失应用,务必要有消息重复机制

    Producer的send方法本身支持内部重试:

    • 至多重试3次
    • 如果发送失败,则轮转到下一个broker
    • 这个方法的总耗时时间不超过sendmsgtimeout设置的值,默认10s。所以如果本身向broker发送消息产生超时异常,就不会再做重试

    以上策略仍然不能保证消息一定发送成功,为保证消息一定成功,建议将消息存储到db,由后台线程定时重试,保证消息一定到达broker。

2.最佳实践之Consumer

3.最佳实践之NameServer

4.JVM 与Linux内核配置


   转载规则


《RocketMQ》 keyj 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
少有人走的路 少有人走的路
第一部分:自侓自律是解决人生问题最主要的工具,也是消除人生痛苦最重要的方法。 1.问题和痛苦0623 人生苦难重重。 这是个伟大的真理,是世界上最伟大的真理之一。它的伟大之处在于,一旦我们领悟了这句话的真谛,就能从苦难中解脱出来,
2019-06-23 keyj
下一篇 
日常问题记录 日常问题记录
centos7虚拟机1.每次虚拟机环境重启后ip都会变动,极不方便。设置静态ip, 参考:https://www.cnblogs.com/jadedoo/p/9967111.html 2.idea将鼠标定位到上次编辑的位置
2019-06-21 keyj
  目录