一.RocketMQ入门
1.特性
- 原生分布式
- 两种消息拉取
- 严格消息顺序
- 特有的分布式协调器
- 亿级消息堆积
- 组
2.基本概念
producer
消息生产者,负责产生消息,一般由业务系统负责产生消息
consumer
消息消费者,负责消费消息,一般是后台系统负责异步消费
push consumer
封装消息拉取,消费进程和内部
pull consumer
主动拉取消息,一旦拉取到消息,应用的消费进程进行初始化
producer group
一类producer的集合名称,这类producer通常发送一类消息,且发送逻辑一致
consumer group
一类consumer的集合名称,这类consumer通常消费一类消息,且消费逻辑一致
broker
消息中转角色,负责存储消息,转发消息,这里就是rockermq server
topic
消息的主题,用于定义并在服务端配置,消费者可以按主题进行订阅,也就是消息分类,通常一个系统一个topic
Message
在生产者、消费者、服务器之间传递的消息,一个message必须属于一个topic
namesrv
一个无状态的名称服务,可以集群部署,每一个broker启动的时候都会向名称服务器注册,主要
是接收broker的注册,接收客户端的路由请求并发挥路由信息
offset
偏移量,消费者拉取消息时需要知道上一次消费到了什么位置,这一次从哪里开始
partition
- 分区,topic物理上的分组,一个topic可以分为多个分区,每个分区是一个有序队列。
分区中的消息都会给分配一个有序的id,也就是偏移量
Tag
用于对消息进行过滤,理解为message的标记,同一业务不同目的的message可以用相同的topic但是可以用不同的tag来区分
key
消息的key字段是为了唯一表示消息的,为了方便查阅,不是说必须设置,只是说设置了方便开发和运维定位问题。比如:这个key可以是订单的id等。
3.安装运行
3.1环境准备
centos7、jdk8
3.2下载rocketmq
官网下载 wget
3.3运行
注意: 启动默认 nameserver: 4g broker:8g 需要调整
nameserver默认占用内存配置在:/rocketmq/bin/runserver.sh -Xms1g -Xmx1g
broker默认使用内存大小配置: /rocketmq/bin/runbroker.sh -Xms2g -Xmx2g
启动nameserver:
nohup sh mqnamesrv ~~-n "192.168.164.131:9876"~~> ../logs/namesrv.log 2>&1 & 查看运行进程 ps -ef|grep NamesrvStartup
启动broker:
nohup sh mqbroker ~~-n localhost:9876~~>../logs/broker.log 2>&1 &(并不完整)
nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf >../logs/broker.log 2>&1 &
指定加载配置文件
查看broker是否运行 ps -ef|grep broker
停止broker
bin/mqshutdown broker
停止nameserver
bin/mqshutdown namesrv
验证步骤:注意必须在同一个会话中
发送消息
export NAMESRV_ADDR=localhost:9876
bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
接收消息
bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
注意:默认会发送很多数据,注意
_‘’
3.4可视化工具
3.4.1. git clone源码
git clone https://github.com/apache/rocketmq-externals
3.4.2. 切换版本
git checkout rocketmq-console-1.0.0
3.4.3. 编译为jar
mvn clean package -Dmaven.test.skip=true
3.4.4. 启动
# jar包在target目录下面,你可以放在一台服务器上面运行
java -jar rocketmq-console-ng-1.0.0.jar --server.port=8081--rocketmq.config.namesrvAddr=192.168.100.242:9876
# --server.port springboot内置tomcat的端口号,默认8080;
# --rocketmq.config.namesrvAddr nameserver的地址
nohup java -jar rocketmq-console-ng-1.0.0.jar –server.port=8081–rocketmq.config.namesrvAddr=node1:9876>rocketmqConsole.log 2>&1 &
3.5应用
rocketmq的客户端:maven 依赖 org.apache.rocketmq rocketmq-client
3.5.1java应用运行出现问题
1.无法连接
查看console控制台,集群部分,发现broker在namesrv上面注册地址和所在服务器实际地址不一致
*解决办法:broker端启动时,需要指定加载配置文件,并设置broker当前服务器ip
nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf >../logs/broker.log 2>&1 &
*参考:https://blog.csdn.net/huang_550/article/details/90693656
二.架构方案及角色详解
- RocketMQ角色介绍
- RocketMQ架构方案
- RocketMQ集群部署配置
1.角色介绍
RocketMQ由四个角色组成:
- producer:消息生产者
- consumer:消费者
- broker: MQ服务,负责接收、分发消息
- nameserver:负责mq服务之间的协调
2.架构方案
2.1Nameserver Cluster
- 服务注册与发现
- 路由管理
nameserver 提供轻量级服务发现与路由。
每个名称服务器记录完整的路由信息,提供相应的读写服务,并支持快速的存储扩展。
3.安装运行
下载安装包,解压
更改nameserver、broker默认内存配置
./bin/mqadmin 命令管理工具
./bin/mqadmin clusterlist -n
4.集群
在应用包下有双主双从同步、双主双从异步、主从配置文件例子
nameserver集群不互相同步数据,会保存所有broker信息
配置说明1
配置说明2
配置说明3
三.有序消息
场景:同时存钱、取钱短信通知
1.有序消息
有序消息又叫顺序消息(FIFO消息),是指消息的消费顺序和产生顺序相同,在有些业务逻辑下,必须保证顺序。比如订单的生成、付款、发货,这个消息必须按顺序处理才行。顺序消息分为**全局顺序**和**分区(queue)顺序**
1.1全局顺序
一个topic内所有的消息都发布到同一个queue中,按照先进先出的顺序进行发布和消费。
***适用场景:***性能要求不高,所有的消息严格按照FIFO原则进行消息发布和消费的场景
1.2 分区顺序
对于指定的一个Topic,所有的消息根据sharding key 进行区块(queue)分区,同一个queue内的消息按照严格的FIFO顺序进行发布和消费。
sharding key 是顺序消息中用来区分不同分区的关键手段,和普通消息的key是完全不同的概念。
适用场景:性能要求高,根据消息中的sharding key去决定消费发送到哪一个queue。
1.3 全局顺序和分区顺序对比
消息类型对比
消息类型 支持事务消息 支持定时消息 性能 无序消息(普通、事务、定时、延时消息) 是 是 最高 分区顺序消息 否 否 高 全局顺序消息 否 否 一般 发送方式对比
消息类型 支持可靠同步发送 支持可靠异步发送 支持oneway发送 无序消息(普通、事务、定时、延时) 是 是 是 分区顺序消息 是 否 否 全局顺序消息 是 否 否
2.如何保证消息顺序
在MQ的模型中,顺序需要由3个阶段去保障:
- 消息被发送时保持顺序
- 消息被存储时保持和发送的顺序一致
- 消息被消费时保持和存储的顺序一致
3.RocketMQ顺序消息的实现
RocketMQ消费端有两种类型,MQPullConsumer和MQPushConsumer
本质上底层都是通过pull机制去实现,pushConsumer是一种API封装
3.1MQPullConsumer
MQPullConsumer 由用户控制线程,主动从服务端获取消息,每次获取到的是一个MessageQueue中的消息。PullResult中的List<MessageExt> msgFoundList自然和存储顺序一致,用户需要再拿到这批消息后自己保证消费的顺序。
3.2MQPushConsumer
MQPushConsumer由用户注册MessageListener来消费消息,在客户端中需要保证调用MessageListener时消息的顺序性。
4.有序消息的缺陷
发送顺序消息无法利用集群的FailOver特性,因为不能更换MessageQueue进行重试,因为发送的路由策略导致的热点问题,可能某一些MessageQueue的数据量特别大
- 消费的并行读依赖于queue数量
- 消费失败时无法跳过
5.有序消息的使用
官网教程:http://rocketmq.apache.org/docs/order-example
四.订阅机制和定时消息
- 发布订阅的基本概念
- RocketMQ订阅模式实现原理
- 使用订阅模式
- 定时消息的基本概念
- Broker定时消息发送逻辑
- 使用定时消息
1.发布订阅
发布订阅模式又叫观察者模式,它定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖它的对象都将得到通知。
2.RocketMQ的发布订阅
RocketMQ的消息订阅分为两种模式:
Push模式(MQPushConsumer):broker主动向消费者推送
Pull模式(MQPullConsumer):消费者在需要消息时,主动到broker拉取
在rocketmq具体实现中,push和pull模式都是采用消费者主动从broker拉取消息。
3.定时消息
定时消息是指消息发送到broker后,不能立刻被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。
如果要支持任意的时间精度,在broker层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。
RocketMQ支持定时消息,但是不支持任意时间精度,支持特定的level级别,例如,定时5s、10s、1m等。
使用定时消息,官方提供例子:http://rocketmq.apache.org/docs/schedule-example/
五.批量消息和事务消息
1.批量消息
在很多调优时,比如数据库批量处理,有些请求进行合并发送等都是类似批量的实现。RocketMQ批量发送也是为了追求性能,特别在消息数量特别大的时候,批量效果就非常明显。
1.1.使用批量消息的限制
- 同一批次的消息应该具有相同主题、相同的消息配置
- 不支持延迟消息
- 建议一个批量消息大小最好不要超过1MB
官方实例:http://rocketmq.apache.org/docs/batch-example
2.事务消息
RocketMQ的事务消息,是指Producer端消息发送事件和本地事务事件,同时成功或同时失败。
2.1事务消息的使用约束
事务消息不支持定时和批量
为了避免一个消息被多次检查,导致半数队列消息堆积,RocketMQ限制了单个消息的默认检查次数为15次,
通过修改broker配置文件中的transactionCheckMax参数进行调整
特定的时间段之后才检查事务,通过broker配置文件参数transactionTimeOut或用户配置CHECK_IMMUNITY_TIME_IN_SECONDS调整时间
一个事务消息可能被检查或消费多次
提交过的消息重新放到用户目标主题可能会失败
事务消息的生产者ID不能与其他类型消息的生产者ID共享
2.2事务状态
三种事务消息状态:
TransactionStatus.CommitTransaction 提交事务,允许消费者消费这个消息
TransactionStatus.RollbackTransaction 回滚事务,消息将会被删除或不再允许消费
TransactionStatus.Unknown 中间状态,MQ需要重新检查来确定状态
六.RocketMQ中高性能最佳实践
1.最佳实践之Producer
一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。
只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags在broker做消息过滤
每个消息在业务层面的唯一标识码,要设置到keys字段,方便将来定位消息丢失问题
服务器会为每个消息创建索引(哈希索引),应用可以通过topic,key来查询这条消息内容,以及消息被谁消费。
由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。
如有可靠性需要,消息发送成功或者失败,要打印消息日志(sendresule和key信息)
如果相同性质的消息量大,使用批量消息,可以提升性能
建议消息大小不超过512KB
send(msg) 会阻塞,如果有性能要求,可以使用异步的方式:send(msg,callback)
如果在一个jvm中,有多个生产者进行大数据处理,建议:
- 少数生产者使用异步发送方式(3~5个)
- 通过setinstanceName方法,给每一个生产者设置一个实例名
send消息方法,只要不抛出异常,就代表发送成功。但是发送成功会有多个状态,在sendresule里定义
- SEND_OK:消息发送成功
- FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
- FLUSH_SLAVE_TIMEOUT : 消息发送成功,但是服务器同步到slave时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
- SLAVE_NOT_AVAILABLE:消息发送成功,但是此时slave不可用,消息已经进入服务器队列,此时只有服务器宕机,消息才会丢失
- 如果状态是FLUSH_DISK_TIMEOUT或FLUSH_SLAVE_TIMEOUT,并且broker正好关闭,此时可以丢失这条消息,或者重发。建议最好重发,由消费者去重
- Producer向broker发送请求会等待响应,但如果达到最大等待时间,未得到响应,则客户端将抛出RemotingTimeoutException
- 默认等待时间是3秒,如果使用send(msg,timeout),则可以自己设定超时时间,但超时时间不能设置太小,因为broker需要一些时间来刷新磁盘或与从属设备同步
- 如果该值超过syncFlushTimeout ,则该值可能影响不大,因为broker可能会在超时之前返回FLUSH_SLAVE_TIMEOUT或FLUSH_SLAVE_TIMEOUT的响应
对于消息不可丢失应用,务必要有消息重复机制
Producer的send方法本身支持内部重试:
- 至多重试3次
- 如果发送失败,则轮转到下一个broker
- 这个方法的总耗时时间不超过sendmsgtimeout设置的值,默认10s。所以如果本身向broker发送消息产生超时异常,就不会再做重试
以上策略仍然不能保证消息一定发送成功,为保证消息一定成功,建议将消息存储到db,由后台线程定时重试,保证消息一定到达broker。
2.最佳实践之Consumer
3.最佳实践之NameServer
4.JVM 与Linux内核配置