go语言使用RocketMq的示例代码

 更新时间:2026年01月28日 11:32:59   作者:你我的过去  
这篇文章主要介绍了go语言使用RocketMq的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

前言

在 Go 语言中使用 Apache RocketMQ,官方推荐的方式是通过 apache/rocketmq-client-go 这个客户端库。它是 Apache 官方维护的 Go SDK,支持 Producer(生产者)、Push Consumer(推模式消费者)、Pull Consumer(拉模式消费者)等功能

一、下载并启动

来到rocketmq下载官网
在Binary下载中选择合适的版本下载

配置环境变量

进入broker.conf并进行修改

修改如下:

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

在bin目录下执行如下命令即可启动rocketmq
start mqnamesrv.cmd
.\mqbroker.cmd -n 127.0.0.1:9876 -c …\conf\broker.conf

综上,rocketmq在windows的配置与启动已完成

二、生产者

import (
	"context"
	"fmt"
	"os"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
)

func main() {
	// 创建RocketMQ生产者实例
	// 参数说明:
	// - producer.WithNsResolver: 设置NameServer地址解析器,指定NameServer地址为"127.0.0.1:9876"
	// - producer.WithRetry: 设置消息发送失败时的重试次数为2次
	p, err := rocketmq.NewProducer(
		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
		producer.WithRetry(2),
	)
	if err != nil {
		fmt.Printf("create producer error: %s\n", err.Error())
		os.Exit(1)
	}

	err = p.Start()
	if err != nil {
		fmt.Printf("start producer error: %s\n", err.Error())
		os.Exit(1)
	}
	// 延迟执行生产者关闭操作,确保在main函数结束前关闭生产者连接并释放资源
	defer p.Shutdown()

	// 发送同步消息到RocketMQ服务器
	// 构建消息对象,指定主题和消息内容
	msg := &primitive.Message{
		Topic: "TestTopic",
		Body:  []byte("Hello RocketMQ from Go!"),
	}

	// 使用生产者同步发送消息
	// 参数说明:
	// - context.Background(): 使用空上下文进行消息发送
	// - msg: 要发送的消息对象
	// 返回值说明:
	// - res: 消息发送结果,包含消息ID等信息
	// - err: 发送过程中可能发生的错误
	res, err := p.SendSync(context.Background(), msg)//异步发送消息使用的是 SendAsync 方法
	if err != nil {
		fmt.Printf("send message error: %s\n", err)
	} else {
		fmt.Printf("send message success: %s\n", res.String())
	}

}

三、消费者

推模式示例:

import (
	"context"
	"fmt"
	"os"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
	// 创建RocketMQ推模式消费者实例
	// 参数说明:
	// - consumer.WithGroupName: 设置消费者组名称为"TestGroup"
	// - consumer.WithNsResolver: 设置NameServer地址解析器,指定NameServer地址为"127.0.0.1:9876"
	// 返回值说明:
	// - c: 创建的PushConsumer实例
	// - err: 创建过程中可能发生的错误
	c, err := rocketmq.NewPushConsumer(
		consumer.WithGroupName("TestGroup"),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
	)
	if err != nil {
		fmt.Printf("create consumer error: %s\n", err.Error())
		os.Exit(1)
	}

	// 订阅指定主题的消息
	// 参数说明:
	// - "TestTopic": 要订阅的主题名称
	// - consumer.MessageSelector{}: 消息选择器,此处使用默认选择器
	// - func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error): 消息处理回调函数
	//   - ctx: 上下文参数
	//   - msgs: 接收到的消息列表
	//   - 返回值: 消费结果和可能的错误
	// 返回值说明:
	// - err: 订阅过程中可能发生的错误
	err = c.Subscribe("TestTopic", consumer.MessageSelector{}, func(ctx context.Context,
		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
		for _, msg := range msgs {
			fmt.Printf("Received message: %s\n", string(msg.Body))
		}
		return consumer.ConsumeSuccess, nil
	})
	if err != nil {
		fmt.Printf("subscribe error: %s\n", err.Error())
		os.Exit(1)
	}

	err = c.Start()
	if err != nil {
		fmt.Printf("start consumer error: %s\n", err.Error())
		os.Exit(1)
	}
	defer c.Shutdown()

	// 阻塞主 goroutine,防止程序退出
	select {}
}

消费者有推模式和拉模式两种模式,如下

1、推模式

优点:
实时性高 消息到达 Broker 后几乎立即被投递给消费者(延迟低),适合对实时性要求高的场景。
RocketMQ Push Consumer 会自动进行 队列分配(Rebalance),多个消费者实例能自动分摊 Topic 的消息队列
消费成功后自动提交 offset;失败可重试(支持重试 Topic)
缺点:
消费者被动接收,难以控制消费速率
消息由 Broker 主动“推送”(通过长轮询模拟),消费者无法决定何时拉、拉多少。
如果消息突发洪峰,消费者可能被瞬间压垮(即使有流控,也可能来不及响应)
常用场景:
实时消息处理(如订单支付通知、即时通讯)
希望快速消费、低延迟
不想手动管理拉取逻辑和 offset

2、拉模式

优点:
消费者自己控制何时、从哪个队列、拉多少条消息,完全手动管理消费进度(offset)。
可精确控制拉取频率、批量大小、消费速度,适合复杂调度逻辑。
缺点:
开发复杂度高,易出错,必须手动管理 offset,无自动负载均衡
实时性差,延迟不可控。若为了低延迟频繁拉取,又会增加 Broker 压力,产生大量空轮询(浪费网络和 CPU)
常用场景:
消费速度需要严格控制(如限流、批处理)
需要自定义消费策略(如按优先级消费)
消费逻辑复杂,需精细控制 offset

到此这篇关于go语言使用RocketMq的示例代码的文章就介绍到这了,更多相关go语言使用RocketMq内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • go中实现字符切片和字符串互转

    go中实现字符切片和字符串互转

    这篇文章主要为大家详细介绍了go语言中如何实现字符切片和字符串互转,文中的示例代码讲解详细,具有一定的学习价值,感兴趣的小伙伴可以了解一下
    2023-11-11
  • Golang处理gRPC请求/响应元数据的示例代码

    Golang处理gRPC请求/响应元数据的示例代码

    前段时间实现内部gRPC框架时,为了实现在服务端拦截器中打印请求及响应的头部信息,便查阅了部分关于元数据的资料,因为中文网络上对于该领域的信息较少,于是在这做了一些简单的总结,需要的朋友可以参考下
    2024-03-03
  • go reflect要不要传指针原理详解

    go reflect要不要传指针原理详解

    这篇文章主要为大家介绍了go reflect要不要传指针原理详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-01-01
  • Go语言for-range函数使用技巧实例探究

    Go语言for-range函数使用技巧实例探究

    这篇文章主要为大家介绍了Go语言for-range函数使用技巧实例探究,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2024-01-01
  • Go语言框架快速集成限流中间件详解

    Go语言框架快速集成限流中间件详解

    这篇文章主要为大家介绍了Go语言框架快速集成限流中间件详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-09-09
  • golang如何利用原始套接字构造UDP包详解

    golang如何利用原始套接字构造UDP包详解

    这篇文章主要给大家介绍了关于golang如何利用原始套接字构造UDP包的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用golang具有一定的参考学习价值,需要的朋友们下面来一起看看吧。
    2017-10-10
  • go开发过程中mapstructure使用示例详解

    go开发过程中mapstructure使用示例详解

    mapstructure是一个Go语言库,用于将映射(如map或struct)解码为结构体,便于处理JSON、YAML等配置文件数据,通过字段名或结构体标签控制解码,支持嵌套结构体、灵活处理多种数据源,需要注意错误处理,该库适合于Go开发中配置数据的读取和转换
    2024-10-10
  • 一文探索Go中的函数使用方式

    一文探索Go中的函数使用方式

    在编程中,函数是基本构建块之一,Go语言以其简洁明了的函数定义和调用语法而闻名,所以本文就来和大家聊聊Go中的函数概念及使用,感兴趣的可以了解下
    2023-09-09
  • go语言题解LeetCode1299将每个元素替换为右侧最大元素

    go语言题解LeetCode1299将每个元素替换为右侧最大元素

    这篇文章主要为大家介绍了go语言LeetCode刷题1299将每个元素替换为右侧最大元素示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-01-01
  • 详解如何使用Golang操作MongoDB数据库

    详解如何使用Golang操作MongoDB数据库

    在现代开发中,数据存储是一个至关重要的环节,MongoDB作为一种NoSQL数据库,提供了强大的功能和灵活的数据模型,与Golang的高性能和并发性能非常契合,本文将探讨Golang与MongoDB的完美组合,介绍如何使用Golang操作MongoDB数据库,需要的朋友可以参考下
    2023-11-11

最新评论