golang微服务框架Kratos实现消息队列

 更新时间:2025年12月15日 10:32:00   作者:喵了几个咪  
本文介绍了在Golang微服务框架Kratos中实现消息队列的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

什么是消息队列

MQ就是消息队列,是Message Queue的缩写。消息队列是一种通信方式。消息的本质就是一种数据结构。因为MQ把项目中的消息集中式的处理和存储,所以MQ主要有解耦,并发,和削峰的功能。

为什么要使用消息队列

1. 异步

通常的微服务实现里面,都是通过RPC进行微服务之间的相互调用,这是同步的。如果消息队列的话,可以实现异步的调用。至于异步有啥好处呢,主要是为了削峰。

2. 削峰

同步的调用会带来一个问题:瞬时流量。客户的调用同步接口节奏,你是无法把控的,流量将会是忽高忽低的,猛的来一波,搞不好系统就崩了溃了。

如果消息队列的话,可以实现异步的调用,并且可以实现削峰,请求进来,我先放到消息队列里面去,慢慢消化掉,不至于猛的来一下,把系统击垮。

3. 解耦

通常的微服务实现里面,都是通过RPC进行微服务之间的相互调用,那么意味着,你要做到一件事情,你必须要知道做事情的对方是谁。在微服务的世界里面,如果设计得不好,那就是一团糟的相互调用网络,看得你晕晕的,运维会疯,后面接手的开发人员也得疯。

应用了消息队列,你就只需要跟消息队列这个代理打交道,单线联系,关系简单。我们只需要生产消息,消费消息,至于是谁消费的,谁生产的,完全不用去管它。架构上,就会清爽多了。所以,要对服务进行解耦,消息队列是一个很好的选择。

Kratos与消息队列

Kratos现在的版本(v2.2.1)中,还没有对消息队列的直接支持,但是要运用还是容易的。官方有一个空壳示例代码BeerShop,可以看到,在data层,使用Kafka的痕迹。

对于在Kratos微服务框架里面应用消息队列,我认为有两种方式可以实现:

  1. 在data层,使用消息队列,但是在这个层,你只能在那生产消息,而不好消费消息。
  2. 将消息队列的客户端实现为微服务的一个Server,然后在微服务的Service中消费消息和生产消息。

第一个方式的应用面不广,更多的时候,第二种方式的应用面会更广一些,我选择了第二种方式。但是,Kratos官方并没有支持这一种方式。故而,我只能够自己动手实现了,我从另外一个微服务Go-Micro里面提取了其Broker的实现,并且将其实现为Kratos框架里面的一个Server。事实证明,这样是可行的,并且很好使。

你可能会问,为什么我不直接使用Go-Micro呢?因为Go-Micro是一个很重的微服务框架,尽管它的功能很丰富,几乎支持了大部分的微服务需求。但是对于一个应用来说,我并不需要使用所有的技术栈、中间件,我只需要部分的技术栈。所以,我宁愿做加法,也不愿意去做减法。对于服务端来说,可控、可用、可维护是最重要的。极简,是一个很好的选择。另外,我还要腹诽一点,我从Go-Micro提取出来的Broker在测试的过程中发现,都有一些瑕疵。

我实现的代码,我放到了github:https://github.com/tx7do/kratos-transport

它所支持的协议和消息队列有:

  • Kafka
  • RabbitMQ
  • NATS
  • Redis
  • MQTT
  • WebSocket

基本上是够用了。

kratos-transport的应用

它主要分为了3个部分:

1. Codec 编解码器

编解码器现在使用的是Kratos的编解码器。

2. Broker 消息队列客户端

可以直接拿来使用,我拿Kafka举例:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/go-kratos/kratos/v2/encoding"
	"github.com/tx7do/kratos-transport/broker"
	"github.com/tx7do/kratos-transport/broker/kafka"
)

const (
	testBrokers = "localhost:9092"
	testTopic   = "test_topic"
	testGroupId = "a-group"
)

type Hygrothermograph struct {
	Humidity    float64 `json:"humidity"`
	Temperature float64 `json:"temperature"`
}

func registerHygrothermographHandler() broker.Handler {
	return func(ctx context.Context, event broker.Event) error {
		var msg *Hygrothermograph = nil

		switch t := event.Message().Body.(type) {
		case []byte:
			msg = &Hygrothermograph{}
			if err := json.Unmarshal(t, msg); err != nil {
				return err
			}
		case string:
			msg = &Hygrothermograph{}
			if err := json.Unmarshal([]byte(t), msg); err != nil {
				return err
			}
		case *Hygrothermograph:
			msg = t
		default:
			return fmt.Errorf("unsupported type: %T", t)
		}

		if err := handleHygrothermograph(ctx, event.Topic(), event.Message().Headers, msg); err != nil {
			return err
		}

		return nil
	}
}

func handleHygrothermograph(_ context.Context, topic string, headers broker.Headers, msg *Hygrothermograph) error {
	log.Printf("Headers: %+v, Humidity: %.2f Temperature: %.2f\n", headers, msg.Humidity, msg.Temperature)
	return nil
}

func main() {
	ctx := context.Background()

	interrupt := make(chan os.Signal, 1)
	signal.Notify(interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

	b := kafka.NewBroker(
		broker.OptionContext(ctx),
		broker.Addrs(testBrokers),
		broker.Codec(encoding.GetCodec("json")),
	)

	_, err := b.Subscribe(testTopic,
		registerHygrothermographHandler(),
		func() broker.Any {
			return &Hygrothermograph{}
		},
		broker.SubscribeContext(ctx),
		broker.Queue(testGroupId),
	)
	if err != nil {
		fmt.Println(err)
	}

	<-interrupt
}

3. Server 封装给Kratos的Server实现

还是拿Kafka举例:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"

	"github.com/go-kratos/kratos/v2"
	"github.com/go-kratos/kratos/v2/encoding"
	"github.com/tx7do/kratos-transport/broker"
	"github.com/tx7do/kratos-transport/transport/kafka"
)

const (
	testBrokers = "localhost:9092"
	testTopic   = "test_topic"
	testGroupId = "a-group"
)

type Hygrothermograph struct {
	Humidity    float64 `json:"humidity"`
	Temperature float64 `json:"temperature"`
}

func registerHygrothermographHandler() broker.Handler {
	return func(ctx context.Context, event broker.Event) error {
		var msg *Hygrothermograph = nil

		switch t := event.Message().Body.(type) {
		case []byte:
			msg = &Hygrothermograph{}
			if err := json.Unmarshal(t, msg); err != nil {
				return err
			}
		case string:
			msg = &Hygrothermograph{}
			if err := json.Unmarshal([]byte(t), msg); err != nil {
				return err
			}
		case *Hygrothermograph:
			msg = t
		default:
			return fmt.Errorf("unsupported type: %T", t)
		}

		if err := handleHygrothermograph(ctx, event.Topic(), event.Message().Headers, msg); err != nil {
			return err
		}

		return nil
	}
}

func handleHygrothermograph(_ context.Context, topic string, headers broker.Headers, msg *Hygrothermograph) error {
	log.Printf("Humidity: %.2f Temperature: %.2f\n", msg.Humidity, msg.Temperature)
	return nil
}

func main() {
	ctx := context.Background()

	kafkaSrv := kafka.NewServer(
		kafka.WithAddress([]string{testBrokers}),
		kafka.WithCodec(encoding.GetCodec("json")),
	)

	_ = kafkaSrv.RegisterSubscriber(ctx,
		testTopic, testGroupId, false,
		registerHygrothermographHandler(),
		func() broker.Any {
			return &Hygrothermograph{}
		})

	app := kratos.New(
		kratos.Name("kafka"),
		kratos.Server(
			kafkaSrv,
		),
	)
	if err := app.Run(); err != nil {
		log.Println(err)
	}
}

另外再看一个例子,是Websocket的,它的应用其实也是很广的:

package main

import (
	"errors"
	"fmt"
	"log"

	"github.com/go-kratos/kratos/v2"
	"github.com/go-kratos/kratos/v2/encoding"
	"github.com/tx7do/kratos-transport/transport/websocket"
)

var testServer *websocket.Server

const (
	MessageTypeChat = iota + 1
)

type ChatMessage struct {
	Type    int    `json:"type"`
	Message string `json:"message"`
}

func main() {
	wsSrv := websocket.NewServer(
		websocket.WithAddress(":8800"),
		websocket.WithPath("/ws"),
		websocket.WithConnectHandle(handleConnect),
		websocket.WithCodec(encoding.GetCodec("json")),
	)

	testServer = wsSrv

	wsSrv.RegisterMessageHandler(MessageTypeChat,
		func(sessionId websocket.SessionID, payload websocket.MessagePayload) error {
			switch t := payload.(type) {
			case *ChatMessage:
				return handleChatMessage(sessionId, t)
			default:
				return errors.New("invalid payload type")
			}
		},
		func() websocket.Any { return &ChatMessage{} },
	)

	app := kratos.New(
		kratos.Name("websocket"),
		kratos.Server(
			wsSrv,
		),
	)
	if err := app.Run(); err != nil {
		log.Println(err)
	}
}

func handleConnect(sessionId websocket.SessionID, register bool) {
	if register {
		fmt.Printf("%s connected\n", sessionId)
	} else {
		fmt.Printf("%s disconnect\n", sessionId)
	}
}

func handleChatMessage(sessionId websocket.SessionID, message *ChatMessage) error {
	fmt.Printf("[%s] Payload: %v\n", sessionId, message)

	testServer.Broadcast(MessageTypeChat, *message)

	return nil
}

具体的应用实例

我写了一些实例代码,并且都已经提交到了Kratos的examples代码仓库中去了。

kratos-cqrs

这是一个简单的CQRS的实现,主要就是拿了Kafka来消费来自于传感器的遥感数据,然后把数据存储到数据库中去。

需要注意的是,这个实例并不够完整,我并没有实现MQTT的消费,没有实现前端页面等等。只实现了对Kafka的消费。

kratos-realtimemap

这是一个完整的物联网相关的例子,有前端,有后端,可以完整的跑起来看。

通过MQTT接收一个开放的公交遥测数据源,然后通过REST和Websocket向前端发送数据,在地图上展现出来车辆的轨迹、车辆的位置、车辆的速度、开关门状态等等。

kratos-chatroom

最简单的Websocket聊天室,客户端发送消息,服务端接收之后立即广播给其他客户端。

中间件代码

到此这篇关于golang微服务框架Kratos实现消息队列的文章就介绍到这了,更多相关golang Kratos消息队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 浅析go中的map数据结构字典

    浅析go中的map数据结构字典

    golang中的map是一种数据类型,将键与值绑定到一起,底层是用哈希表实现的,可以快速的通过键找到对应的值。这篇文章主要介绍了go中的数据结构字典-map,需要的朋友可以参考下
    2019-11-11
  • golang处理TIFF图像的实现示例

    golang处理TIFF图像的实现示例

    本文介绍了在Go语言中处理TIFF图像,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2025-03-03
  • goland设置颜色和字体的操作

    goland设置颜色和字体的操作

    这篇文章主要介绍了goland设置颜色和字体的操作方式,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-05-05
  • Go语言开发k8s之Service操作解析

    Go语言开发k8s之Service操作解析

    这篇文章主要为大家介绍了Go语言开发k8s之Service操作解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-06-06
  • Go Context库 使用基本示例

    Go Context库 使用基本示例

    在Go的http包中,每个请求由独立的goroutine处理,这些goroutine可能需要访问请求特定的数据或启动其他服务,Context在Go语言中提供了一种方式来传递请求域的数据、取消信号和截止时间,本文介绍Go Context库 使用基本示例,感兴趣的朋友跟随小编一起看看吧
    2024-09-09
  • golang JSON序列化和反序列化示例详解

    golang JSON序列化和反序列化示例详解

    通过使用Go语言的encoding/json包,你可以轻松地处理JSON数据,无论是在客户端应用、服务器端应用还是其他类型的Go程序中,这篇文章主要介绍了golang JSON序列化和反序列化,需要的朋友可以参考下
    2024-04-04
  • golang日志包logger的用法详解

    golang日志包logger的用法详解

    这篇文章主要介绍了golang日志包logger的用法详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-05-05
  • 使用go操作redis的有序集合(zset)

    使用go操作redis的有序集合(zset)

    这篇文章主要介绍了使用go操作redis的有序集合(zset),具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-12-12
  • 详解如何在Go语言中生成随机种子

    详解如何在Go语言中生成随机种子

    这篇文章主要为大家详细介绍了如何在Go语言中生成随机种子,文中的示例代码讲解详细,具有一定的借鉴价值,有需要的小伙伴可以参考一下
    2024-04-04
  • Go基本数据类型的具体使用

    Go基本数据类型的具体使用

    本文主要介绍了Go的基本数据类型,包括布尔类型、整数类型、浮点数类型、复数类型、字符串类型,具有一定的参考价值,感兴趣的可以了解一下
    2023-11-11

最新评论