贝博恩创新科技网

Apache RocketMQ教程,如何快速入门与实战?

Apache RocketMQ 完整教程

目录

  1. 什么是 RocketMQ?
  2. 核心概念
  3. 为什么选择 RocketMQ?
  4. 环境准备
  5. 快速入门:一个简单的生产者-消费者示例
  6. 核心特性详解
    • 消息发送
    • 消息消费
    • 消息模式
    • 消息队列 的选择策略
  7. 进阶实践
    • 顺序消息
    • 延时消息
    • 事务消息
    • 消息过滤
  8. 管理控制台
  9. 最佳实践

什么是 RocketMQ?

Apache RocketMQ 是一款由阿里巴巴开源,并捐赠给 Apache 基金会的分布式消息中间件和流计算平台,它采用 Java 语言开发,专为海量、高并发、高可用的互联网应用场景而设计,主要用于在分布式系统中提供可靠的异步通信和解耦。

Apache RocketMQ教程,如何快速入门与实战?-图1
(图片来源网络,侵删)

RocketMQ 就像一个“邮箱”,应用程序(生产者)可以把“信件”(消息)投递进去,而其他应用程序(消费者)可以从这个邮箱里取出信件进行处理,这样,发送方和接收方就不需要直接通信,实现了系统的解耦。

核心概念

在开始使用之前,理解以下几个核心概念至关重要:

  • NameServer (名称服务器):RocketMQ 的“注册中心”,它负责管理 Topic 的路由信息,Broker 的状态信息,生产者和消费者通过 NameServer 来获取 Broker 的地址,从而进行通信,NameServer 集群之间是无状态的,可以水平扩展。
  • Broker (消息代理):RocketMQ 的“消息服务器”,它负责存储消息、处理消息的发送和拉取,一个 Broker 通常包含一个或多个 MessageQueue,Broker 会向所有 NameServer 注册自己,并定期发送心跳。
  • Topic (主题):消息的逻辑分类,消息发送者将消息发送到特定的 Topic,消息消费者订阅特定的 Topic 来接收消息,可以理解为消息的“分类标签”。
  • MessageQueue (消息队列):Topic 的物理分区,一个 Topic 可以被分为多个 MessageQueue,分布在不同的 Broker 上,这提高了消息的并行处理能力,是 RocketMQ 实现高吞吐量的关键。
  • Producer (生产者):消息的发送方,负责创建消息,并指定 Topic,然后发送到 Broker。
  • Consumer (消费者):消息的接收方,负责从 Broker 拉取或接收消息并进行处理。
  • Message (消息):传输的数据单元,由消息体(Body)、消息头(Headers)和标签组成。
  • Consumer Group (消费者组):由多个 Consumer 实例组成的组,它们共同消费一个 Topic 下的消息,实现负载均衡和容错,一个 Topic 的同一个 MessageQueue 只能被同一个 Consumer Group 中的一个 Consumer 消费。
  • Offset (消费位点):记录消费者在某个 MessageQueue 上消费到的位置,用于实现消息消费的断点续传。

为什么选择 RocketMQ?

  • 高性能:经过阿里巴巴“双十一”等超大规模场景的洗礼,具备极高的吞吐量,单机可达 10 万级 TPS。
  • 高可靠:支持同步刷盘、异步刷盘、主从同步(Master-Slave)等多种数据持久化机制,确保消息不丢失。
  • 高可用:Broker 支持主从部署,NameServer 也支持集群部署,避免了单点故障。
  • 功能丰富:支持顺序消息、延时消息、事务消息、消息过滤等高级特性,满足复杂业务场景。
  • 生态完善:提供管理控制台、丰富的客户端 SDK(Java, C++, Go, Python 等)和社区支持。

环境准备

在开始之前,你需要确保你的系统已经安装了 Java 环境。

  1. 检查 Java 版本 (RocketMQ 推荐 Java 8 或以上)
    java -version
  2. 下载 RocketMQ 访问 RocketMQ 官网下载页面 下载最新稳定版的二进制包,下载 rocketmq-all-4.9.4-bin-release.zip
  3. 解压
    # 假设下载到 /opt 目录
    cd /opt
    unzip rocketmq-all-4.9.4-bin-release.zip
    mv rocketmq-all-4.9.4-bin-release /usr/local/rocketmq
  4. 配置环境变量 (可选)
    vim ~/.bash_profile
    # 添加以下内容
    export ROCKETMQ_HOME=/usr/local/rocketmq
    export PATH=$PATH:$ROCKETMQ_HOME/bin
    # 使配置生效
    source ~/.bash_profile

快速入门:一个简单的生产者-消费者示例

启动 NameServer 和 Broker

RocketMQ 提供了便捷的 Shell 脚本来启动服务。

Apache RocketMQ教程,如何快速入门与实战?-图2
(图片来源网络,侵删)
  1. 启动 NameServer
    nohup sh $ROCKETMQ_HOME/bin/mqnamesrv &
  2. 启动 Broker
    nohup sh $ROCKETMQ_HOME/bin/mqbroker -n localhost:9876 &
    • -n localhost:9876 指定了 NameServer 的地址。
    • nohup& 让进程在后台运行。

小贴士:默认配置下,RocketMQ 启动会占用较多内存,可能导致普通机器启动失败,可以修改 runbroker.shrunserver.sh 文件,调整 JVM 内存大小。 在 runbroker.sh 中修改: JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

创建 Maven 项目

在你的 IDE(如 IntelliJ IDEA)中创建一个 Maven 项目,并添加 RocketMQ 客户端依赖。

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.4</version>
</dependency>

编写生产者代码

创建一个 Producer 类,用于发送消息。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 1. 创建一个生产者,并指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 2. 指定 NameServer 的地址
        producer.setNamesrvAddr("localhost:9876");
        // 3. 启动生产者
        producer.start();
        for (int i = 0; i < 10; i++) {
            // 4. 创建一条消息
            // 参数:Topic, Tags, Body
            Message msg = new Message("TopicTest", // Topic
                    "TagA", // Tag,用于消息过滤
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) // 消息体
            );
            // 5. 发送消息,并同步等待结果
            SendResult sendResult = producer.send(msg);
            // 6. 打印发送结果
            System.out.printf("%s%n", sendResult);
        }
        // 7. 关闭生产者
        producer.shutdown();
    }
}

编写消费者代码

创建一个 Consumer 类,用于接收消息。

Apache RocketMQ教程,如何快速入门与实战?-图3
(图片来源网络,侵删)
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
    public static void main(String[] args) throws Exception {
        // 1. 创建一个消费者,并指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
        // 2. 指定 NameServer 的地址
        consumer.setNamesrvAddr("localhost:9876");
        // 3. 订阅一个或多个 Topic,并指定消息过滤规则
        // "*" 表示订阅该 Topic 的所有 Tag
        consumer.subscribe("TopicTest", "*");
        // 4. 注册一个消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                // 5. 处理接收到的消息
                for (MessageExt msg : msgs) {
                    System.out.println("收到消息: " + new String(msg.getBody()));
                }
                // 返回消费成功状态
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 6. 启动消费者
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

运行测试

  1. 先运行 Consumer,它会启动并等待消息。
  2. 然后运行 Producer,它会发送 10 条消息。
  3. 观察控制台输出,消费者会打印出收到的每一条消息。

恭喜!你已经成功运行了你的第一个 RocketMQ 应用!

核心特性详解

消息发送

RocketMQ 提供了三种消息发送方式:

  • 同步发送: producer 等待 broker 返回发送结果,最可靠的方式,适用于对可靠性要求高的场景,如重要的通知、短信等。
  • 异步发送: producer 发送消息后不等待 broker 返回结果,而是通过回调函数处理结果,适用于对响应时间敏感,但允许少量失败的场景,如日志收集。
  • 单向发送: producer 只管发送,不关心是否发送成功,性能最高,但可靠性最差,适用于对可靠性要求不高的场景,如日志、埋点数据。

消息消费

RocketMQ 提供了两种消费模式:

  • Push 模式: Broker 主动将消息推送给消费者,消费者只需定义好监听器即可,这种方式对消费者来说更简单,但 Broker 需要维护长连接,实现相对复杂。
  • Pull 模式: 消费者主动从 Broker 拉取消息,这种方式更灵活,消费者可以按需拉取,但需要消费者自己管理消费位点,实现更复杂。

注意: RocketMQ 的 Push 模式底层是基于 Pull 模式实现的,Broker 会根据消费者的拉取情况,将消息主动推送过去,以减少消费者的拉取延迟。

消息模式

在 Push 模式下,消费者组可以消费消息的方式:

  • 集群消费: 一个 Topic 的一个 MessageQueue 只能被同一个 Consumer Group 中的一个消费者实例消费,这是最常用的模式,可以实现负载均衡和容错,如果某个消费者宕机,它消费的 MessageQueue 会被组内其他消费者接管。
  • 广播消费: Topic 的所有 MessageQueue 会被同一个 Consumer Group 中的所有消费者实例消费,每条消息都会被所有消费者处理一遍,适用于需要全量处理的场景,如数据库同步、服务状态同步等。

消息队列 的选择策略

当 Producer 发送消息到 Topic 时,如何选择具体的 MessageQueue 呢?RocketMQ 默认使用 MessageQueueSelector

  • SelectMessageQueueByRandom: 随机选择一个 MessageQueue,这是默认策略,在负载均衡方面表现良好。
  • SelectMessageQueueByHash: 根据 key 的哈希值选择 MessageQueue,这可以保证相同 key 的消息发送到同一个队列,是实现顺序消息的基础。
  • SelectMessageQueueByMachineRoom: 根据机房选择,实现同城多活。

进阶实践

顺序消息

RocketMQ 提供严格的消息顺序保证,分为两类:

  1. 分区顺序: 保证同一个 MessageQueue 上的消息是按顺序消费的,通过 MessageQueueSelector 实现,确保相同 key 的消息进入同一个队列。
    // 发送顺序消息示例
    for (int i = 0; i < 10; i++) {
        int orderId = i % 10; // 模拟订单ID
        Message msg = new Message("OrderTopic", "TagA", ("OrderID: " + orderId).getBytes());
        // 使用 hash 选择器,保证相同 orderId 的消息进入同一个队列
        SendResult sendResult = producer.send(msg, (mqs, msg, arg) -> {
            Integer id = (Integer) arg;
            int index = id % mqs.size();
            return mqs.get(index);
        }, orderId);
    }
  2. 全局顺序: 整个 Topic 的所有消息都严格按照 FIFO 的顺序被消费,实现方式很简单:将 Topic 设置为只有一个 MessageQueue,但这会严重影响吞吐量,不推荐在高并发场景下使用。

延时消息

RocketMQ 支持消息在指定时间后才被消费者消费,常用于以下场景:

  • 订单创建后,30 分钟未支付则自动取消。
  • 活动结束后,给参与的用户发送感谢邮件。

实现方式:在发送消息时,设置 delayTimeLevel 属性,RocketMQ 内置了 18 个级别的延时时间,从 1 秒到 2 小时不等。

// 发送延时消息
Message message = new Message("DelayTopic", "TagA", "This is a delay message".getBytes());
// 设置延时级别,3 代表 10 秒
message.setDelayTimeLevel(3);
producer.send(message);
延时级别 时间
1 1s
2 5s
3 10s
4 30s
5 1m
6 2m
7 3m
8 4m
9 5m
10 6m
11 7m
12 8m
13 9m
14 10m
15 20m
16 30m
17 1h
18 2h

注意: 延时消息的实现原理是,Broker 收到消息后,将其存入一个“延时队列”,由一个专门的定时任务检查,到期后再将其投递到目标 Topic 的队列中,延时级别是固定的,不支持任意时间的延时。

事务消息

事务消息用于解决分布式事务问题,确保“本地事务”和“消息发送”的原子性,用户下单场景,需要同时扣减库存和发送订单通知。

RocketMQ 的事务消息流程如下:

  1. 发送半消息: Producer 发送一条“半消息”(Half Message)到 Broker,这条消息对消费者不可见。
  2. 执行本地事务: Producer 执行本地业务逻辑(如扣减库存)。
  3. 提交或回滚
    • 如果本地事务成功,Producer 向 Broker 发送 COMMIT 命令,Broker 将半消息标记为可消费。
    • 如果本地事务失败,Producer 向 Broker 发送 ROLLBACK 命令,Broker 删除半消息。
    • Producer 超时未响应,Broker 会向 Producer 发送回查请求,确认最终状态。
  4. 消息消费: 消费者最终能消费到这条被确认的事务消息。

消息过滤

RocketMQ 提供两种消息过滤方式:

  1. Tag 过滤: 在发送消息时指定 Tag,消费者在订阅时可以通过 TagA || TagB || TagC 的方式来指定订阅的 Tag,这是最轻量、最高效的方式。

    // Producer
    message.setTags("TagA");
    // Consumer
    consumer.subscribe("TopicTest", "TagA || TagB");
  2. SQL92 属性过滤: 在发送消息时,可以设置自定义的 User Properties(键值对),消费者可以使用类似 SQL 的语法进行过滤,这种方式更灵活,但性能稍差。

    // Producer
    message.putUserProperty("type", "vip");
    message.putUserProperty("level", "5");
    // Consumer
    consumer.subscribe("TopicTest", "type = 'vip' AND level > 3");

    注意: SQL 过滤需要在 Broker 端开启配置 enablePropertyFilter=true,Broker 会将消息内容全部加载到内存中,对性能有影响,需谨慎使用。

管理控制台

RocketMQ 提供了一个基于 Web 的管理控制台,方便我们查看和管理集群。

  1. 下载控制台源码
    git clone https://github.com/apache/rocketmq-externals.git
    cd rocketmq-externals/rocketmq-console
  2. 打包 确保你有 Maven,然后执行:
    mvn clean package -DskipTests
  3. 运行
    java -jar target/rocketmq-console-ng-2.0.0.jar
  4. 配置 NameServer 地址 运行时可以通过命令行参数指定:
    java -jar target/rocketmq-console-ng-2.0.0.jar --server.port=8080 --rocketmq.config.namesrvAddr=localhost:9876
  5. 访问 打开浏览器,访问 http://localhost:8080,你就可以看到 RocketMQ 的集群状态、Topic 列表、消息轨迹等信息了。

最佳实践

  • Topic 设计: 不要滥用 Topic,按业务模块划分,一个 Topic 可以有多个 MessageQueue 来应对高并发。
  • Group 设计: Consumer Group 名字应具有业务含义,如 order-consumer-group,不同的业务场景使用不同的 Group。
  • 异常处理: 消费者处理消息时一定要做好异常捕获和重试机制,消费失败时,根据业务场景选择重试或记录死信队列。
  • 性能调优
    • 生产者: 批量发送消息、异步发送、使用合适的线程池。
    • 消费者: 合理设置 consumeMessageBatchMaxSize(每次拉取消息的最大数量)和 consumeThreadMin/consumeThreadMax(消费线程池大小)。
  • 监控与告警: 监控 Broker 的内存、磁盘、消息堆积量等关键指标,并设置告警,防患于未然。

本教程带你从零开始,系统地学习了 Apache RocketMQ 的核心概念、快速入门、进阶特性和最佳实践,RocketMQ 是一个功能强大且成熟的分布式消息系统,掌握它对于构建高可用、高性能的分布式应用至关重要。

下一步建议

  1. 动手实践: 尝试实现顺序消息、延时消息等进阶示例。
  2. 部署集群: 在多台机器上部署 NameServer 和 Broker 主从集群,体验高可用。
  3. 阅读源码: 如果对底层原理感兴趣,可以阅读 RocketMQ 的源码,特别是消息存储、事务消息等核心模块。
  4. 关注社区: 关注 RocketMQ 的官方博客和邮件列表,了解最新的动态和最佳实践。

希望这份教程对你有帮助!

分享:
扫描分享到社交APP
上一篇
下一篇