go语言使用RocketMq的示例代码
前言
在 Go 语言中使用 Apache RocketMQ,官方推荐的方式是通过 apache/rocketmq-client-go 这个客户端库。它是 Apache 官方维护的 Go SDK,支持 Producer(生产者)、Push Consumer(推模式消费者)、Pull Consumer(拉模式消费者)等功能
一、下载并启动
来到rocketmq下载官网
在Binary下载中选择合适的版本下载

配置环境变量

进入broker.conf并进行修改

修改如下:
brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH
在bin目录下执行如下命令即可启动rocketmq
start mqnamesrv.cmd
.\mqbroker.cmd -n 127.0.0.1:9876 -c …\conf\broker.conf

综上,rocketmq在windows的配置与启动已完成
二、生产者
import (
"context"
"fmt"
"os"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
func main() {
// 创建RocketMQ生产者实例
// 参数说明:
// - producer.WithNsResolver: 设置NameServer地址解析器,指定NameServer地址为"127.0.0.1:9876"
// - producer.WithRetry: 设置消息发送失败时的重试次数为2次
p, err := rocketmq.NewProducer(
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
producer.WithRetry(2),
)
if err != nil {
fmt.Printf("create producer error: %s\n", err.Error())
os.Exit(1)
}
err = p.Start()
if err != nil {
fmt.Printf("start producer error: %s\n", err.Error())
os.Exit(1)
}
// 延迟执行生产者关闭操作,确保在main函数结束前关闭生产者连接并释放资源
defer p.Shutdown()
// 发送同步消息到RocketMQ服务器
// 构建消息对象,指定主题和消息内容
msg := &primitive.Message{
Topic: "TestTopic",
Body: []byte("Hello RocketMQ from Go!"),
}
// 使用生产者同步发送消息
// 参数说明:
// - context.Background(): 使用空上下文进行消息发送
// - msg: 要发送的消息对象
// 返回值说明:
// - res: 消息发送结果,包含消息ID等信息
// - err: 发送过程中可能发生的错误
res, err := p.SendSync(context.Background(), msg)//异步发送消息使用的是 SendAsync 方法
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: %s\n", res.String())
}
}
三、消费者
推模式示例:
import (
"context"
"fmt"
"os"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)
func main() {
// 创建RocketMQ推模式消费者实例
// 参数说明:
// - consumer.WithGroupName: 设置消费者组名称为"TestGroup"
// - consumer.WithNsResolver: 设置NameServer地址解析器,指定NameServer地址为"127.0.0.1:9876"
// 返回值说明:
// - c: 创建的PushConsumer实例
// - err: 创建过程中可能发生的错误
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName("TestGroup"),
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
)
if err != nil {
fmt.Printf("create consumer error: %s\n", err.Error())
os.Exit(1)
}
// 订阅指定主题的消息
// 参数说明:
// - "TestTopic": 要订阅的主题名称
// - consumer.MessageSelector{}: 消息选择器,此处使用默认选择器
// - func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error): 消息处理回调函数
// - ctx: 上下文参数
// - msgs: 接收到的消息列表
// - 返回值: 消费结果和可能的错误
// 返回值说明:
// - err: 订阅过程中可能发生的错误
err = c.Subscribe("TestTopic", consumer.MessageSelector{}, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, msg := range msgs {
fmt.Printf("Received message: %s\n", string(msg.Body))
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Printf("subscribe error: %s\n", err.Error())
os.Exit(1)
}
err = c.Start()
if err != nil {
fmt.Printf("start consumer error: %s\n", err.Error())
os.Exit(1)
}
defer c.Shutdown()
// 阻塞主 goroutine,防止程序退出
select {}
}
消费者有推模式和拉模式两种模式,如下
1、推模式
优点:
实时性高 消息到达 Broker 后几乎立即被投递给消费者(延迟低),适合对实时性要求高的场景。
RocketMQ Push Consumer 会自动进行 队列分配(Rebalance),多个消费者实例能自动分摊 Topic 的消息队列
消费成功后自动提交 offset;失败可重试(支持重试 Topic)
缺点:
消费者被动接收,难以控制消费速率
消息由 Broker 主动“推送”(通过长轮询模拟),消费者无法决定何时拉、拉多少。
如果消息突发洪峰,消费者可能被瞬间压垮(即使有流控,也可能来不及响应)
常用场景:
实时消息处理(如订单支付通知、即时通讯)
希望快速消费、低延迟
不想手动管理拉取逻辑和 offset
2、拉模式
优点:
消费者自己控制何时、从哪个队列、拉多少条消息,完全手动管理消费进度(offset)。
可精确控制拉取频率、批量大小、消费速度,适合复杂调度逻辑。
缺点:
开发复杂度高,易出错,必须手动管理 offset,无自动负载均衡
实时性差,延迟不可控。若为了低延迟频繁拉取,又会增加 Broker 压力,产生大量空轮询(浪费网络和 CPU)
常用场景:
消费速度需要严格控制(如限流、批处理)
需要自定义消费策略(如按优先级消费)
消费逻辑复杂,需精细控制 offset
到此这篇关于go语言使用RocketMq的示例代码的文章就介绍到这了,更多相关go语言使用RocketMq内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
go语言题解LeetCode1299将每个元素替换为右侧最大元素
这篇文章主要为大家介绍了go语言LeetCode刷题1299将每个元素替换为右侧最大元素示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪2023-01-01


最新评论