一文详解Golang连接kafka的基本操作

 更新时间:2025年03月10日 10:39:04   作者:执念斩长河  
这篇文章主要为大家详细介绍了Golang中连接kafka的基本操作的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下

1.kafka的学习

1.1 启动kafka与zookeeper

kafka与zookeeper是相关联的

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

1.2 创建topic

bin/kafka-topics.sh --create --topic hello --bootstrap-server 主机名:9092

1.3 生产消息

bin/kafka-console-producer.sh --broker-list 主机名:9092 --topic hello

运行后可以发送多条,ctrl+c退出

1.4 消费之前的消息

bin/kafka-console-consumer.sh --bootstrap-server 主机名:9092 --from-beginning --topic hello

1.5 指定偏移量消费

bin/kafka-console-consumer.sh --bootstrap-server 主机名:9092 --partition 0 --offset 1 --topic hello

1.6 消费最新的信息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello

2 go操作

2.1 发送消息

// Kafka 配置
const (
	KafkaBroker = "u8sMaster:9092" // 替换为你的 Kafka Broker 地址
	KafkaTopic  = "k8s-version"          // Kafka 主题
)

func main() {
	sendMesgKafka()
}

func sendMesgKafka() {
	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers:  []string{KafkaBroker},
		Topic:    KafkaTopic,
		Balancer: &kafka.LeastBytes{},
	})

	err := w.WriteMessages(context.Background(),
		kafka.Message{
			Key:   []byte("Key-A"),
			Value: []byte("one!"),
		},
		kafka.Message{
			Key:   []byte("Key-B"),
			Value: []byte("two!"),
		},
		kafka.Message{
			Key:   []byte("Key-C"),
			Value: []byte("three!"),
		},
	)

	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	if err := w.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}

	fmt.Println("Message sent successfully")

}

2.2 消费消息

// to consume messages
topic := "test"
partition := 0

conn, err := kafka.DialLeader(context.Background(), "tcp", "u8sMaster:9092", topic, partition)
if err != nil {
    log.Fatal("failed to dial leader:", err)
}

conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

b := make([]byte, 10e3) // 10KB max per message
for {
    n, err := batch.Read(b)
    if err != nil {
        break
    }
    fmt.Println(string(b[:n]))
}

if err := batch.Close(); err != nil {
    log.Fatal("failed to close batch:", err)
}

if err := conn.Close(); err != nil {
    log.Fatal("failed to close connection:", err)
}

2.3 列出所有topic

func main() {
    conn, err := kafka.Dial("tcp", "u8sMaster:9092")
    if err != nil {
        panic(err.Error())
    }
    defer conn.Close()
    
    partitions, err := conn.ReadPartitions()
    if err != nil {
        panic(err.Error())
    }
    
    m := map[string]struct{}{}
    
    for _, p := range partitions {
        m[p.Topic] = struct{}{}
    }
    for k := range m {
        fmt.Println(k)
    }
}

2.4 创建topic

func main() {
        conn, err := kafka.DialLeader(context.Background(), "tcp", "u9sMaster:9092", "topic2", 0)
        if err != nil {
            panic(err.Error())
        }
}

精准地创建topic

func main() {
    conn, err := kafka.Dial("tcp", "u8sMaster:9092")
    if err != nil {
        panic(err.Error())
    }
    defer conn.Close()
    controller, err := conn.Controller()
    if err != nil {
        panic(err.Error())
    }
    var connLeader *kafka.Conn
    connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
    if err != nil {
        panic(err.Error())
    }
    defer connLeader.Close()
}

这里省略了kafka集群的配置,未来有机会补充

以上就是一文详解Golang连接kafka的基本操作的详细内容,更多关于go连接kafka的资料请关注脚本之家其它相关文章!

相关文章

  • Go实现分布式系统高可用限流器实战

    Go实现分布式系统高可用限流器实战

    这篇文章主要为大家介绍了Go实现分布式系统高可用限流器实战,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-06-06
  • 浅析Go语言中的超时控制

    浅析Go语言中的超时控制

    日常开发中我们大概率会遇到超时控制的场景,而一个良好的超时控制可以有效的避免一些问题,所以本文就来和大家深入探讨一下Go语言中的超时控制吧
    2023-10-10
  • Go高级特性探究之优先级队列详解

    Go高级特性探究之优先级队列详解

    Heap 是一种数据结构,这种数据结构常用于实现优先队列,这篇文章主要就是来和大家深入探讨一下GO语言中的优先级队列,感兴趣的可以了解一下
    2023-06-06
  • 手把手带你走进Go语言之条件表达式

    手把手带你走进Go语言之条件表达式

    条件表达式由条件运算符构成,并常用条件表达式构成一个赋值语句,本文给大家介绍了在Go语言中条件表达式的具体用法,讲述的非常详细,对大家的学习或工作具有一定的参考借鉴价值
    2021-09-09
  • 详解Go语言的错误处理和资源管理

    详解Go语言的错误处理和资源管理

    资源处理是什么?打开文件需要关闭,打开数据库连接,连接需要释放。这些成对出现的就是资源管理。有时候我们虽然释放了,但是程序在中间出错了,那么可能导致资源释放失败。如何保证打开的文件一定会被关闭呢?这就是资源管理与错误处理考虑的一个原因
    2021-06-06
  • Golang使用channel实现一个优雅退出功能

    Golang使用channel实现一个优雅退出功能

    最近补 Golang channel 方面八股的时候发现用 channel 实现一个优雅退出功能好像不是很难,之前写的 HTTP 框架刚好也不支持优雅退出功能,于是就参考了 Hertz 优雅退出方面的代码,为我的 PIANO 补足了这个 feature
    2023-03-03
  • Go与Redis实现分布式互斥锁和红锁

    Go与Redis实现分布式互斥锁和红锁

    这篇文章主要介绍了Go与Redis实现分布式互斥锁和红锁,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-09-09
  • Go数组与切片轻松掌握

    Go数组与切片轻松掌握

    在Java的核心库中,集合框架可谓鼎鼎大名:Array、List、Set等等,随便拎一个出来都值得开发者好好学习如何使用甚至是背后的设计源码。虽然Go语言没有如此丰富的容器类型,但也有一些基本的容器供开发者使用,接下来让我们认识一下这些容器类型吧
    2022-11-11
  • golang gorm更新日志执行SQL示例详解

    golang gorm更新日志执行SQL示例详解

    这篇文章主要为大家介绍了golang gorm更新日志执行SQL示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步早日升职加薪
    2022-04-04
  • Go 结构体、数组、字典和 json 字符串的相互转换方法

    Go 结构体、数组、字典和 json 字符串的相互转换方法

    今天小编就为大家分享一篇Go 结构体、数组、字典和 json 字符串的相互转换方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-08-08

最新评论