Golang操作Kafka的实现示例

 更新时间:2023年02月19日 09:00:04   作者:YUHAOHAO  
本文主要介绍了Golang操作Kafka的实现示例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

一.使用库说明

Golang中连接kafka可以使用第三方库:github.com/Shopify/sarama

二.Kafka Producer发送消息

package main 

import (
    "fmt"
    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follower都确认
    config.Producer.Partitioner = sarama.NewRandomPartitioner  //写到随机分区中,我们默认设置32个分区
    config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回

    // 构造一个消息
    msg := &sarama.ProducerMessage{}
    msg.Topic = "task"
    msg.Value = sarama.StringEncoder("producer kafka messages...")

    // 连接kafka
    client, err := sarama.NewSyncProducer([]string{"192.20.216.8:9092"}, config)
    if err != nil {
        fmt.Println("Producer closed, err:", err)
        return
    }
    defer client.Close()

    // 发送消息
    pid, offset, err := client.SendMessage(msg)
    if err != nil {
        fmt.Println("send msg failed, err:", err)
        return
    }
    fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

三.Kafka Consumer消费消息

package main

import (
    "fmt"
    "github.com/Shopify/sarama"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    consumer, err := sarama.NewConsumer([]string{"192.20.216.8:9092"}, nil)
    if err != nil {
        fmt.Println("Failed to start consumer: %s", err)
        return
    }
    partitionList, err := consumer.Partitions("task-status-data") // 通过topic获取到所有的分区
    if err != nil {
        fmt.Println("Failed to get the list of partition: ", err)
        return
    }
    fmt.Println(partitionList)

    for partition := range partitionList{ // 遍历所有的分区
        pc, err := consumer.ConsumePartition("task", int32(partition), sarama.OffsetNewest) // 针对每个分区创建一个分区消费者
        if err != nil {
            fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)
        }
        wg.Add(1)
        go func(sarama.PartitionConsumer) { // 为每个分区开一个go协程取值
            for msg := range pc.Messages() { // 阻塞直到有值发送过来,然后再继续等待
                fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
            }
            defer pc.AsyncClose()
            wg.Done()
        }(pc)
    }
    wg.Wait()
    consumer.Close()
}

到此这篇关于Golang操作Kafka的实现示例的文章就介绍到这了,更多相关Golang操作Kafka内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Go Error 嵌套实现创建方式

    Go Error 嵌套实现创建方式

    这篇文章主要介绍了Go Error 嵌套到底是怎么实现的?大家都知道创建error有两种方式分别是errors.new()另一种是fmt.errorf(),本文通过详细例子给大家介绍,需要的朋友可以参考下
    2022-01-01
  • 浅析Golang中的net/http路由注册与请求处理

    浅析Golang中的net/http路由注册与请求处理

    这篇文章主要为大家详细介绍了Golang中的net/http路由注册与请求处理的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2023-12-12
  • 下载golang.org/x包的操作方法

    下载golang.org/x包的操作方法

    今天小编就为大家分享一篇下载golang.org/x包的操作方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-07-07
  • 一文掌握Go语言并发编程必备的Mutex互斥锁

    一文掌握Go语言并发编程必备的Mutex互斥锁

    Go 语言提供了 sync 包,其中包括 Mutex 互斥锁、RWMutex 读写锁等同步机制,本篇博客将着重介绍 Mutex 互斥锁的基本原理,需要的可以参考一下
    2023-04-04
  • Golang中context包使用场景和示例详解

    Golang中context包使用场景和示例详解

    这篇文章结合示例代码介绍了context包的几种使用场景,文中有详细的代码示例,对学习或工作有一定的帮助,需要的朋友可以参考下
    2023-05-05
  • Go语言包管理模式示例分析

    Go语言包管理模式示例分析

    这篇文章主要为大家介绍了Go语言包管理模式示例分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-05-05
  • 使用go在mangodb中进行CRUD操作

    使用go在mangodb中进行CRUD操作

    这篇文章主要介绍了使用go在mangodb中进行CRUD操作,本文给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-10-10
  • 重学Go语言之数组的具体使用详解

    重学Go语言之数组的具体使用详解

    Go的数组是一种复合数据类型,在平时开发中并不常用,更常用的是切片(slice),可以把切片看作是能动态扩容的数组,切片的底层数据结构就是数组,所以数组虽不常用,但仍然有必要掌握
    2023-02-02
  • Golang websocket协议使用浅析

    Golang websocket协议使用浅析

    这篇文章主要介绍了Golang websocket协议的使用,WebSocket是一种新型的网络通信协议,可以在Web应用程序中实现双向通信,感兴趣想要详细了解可以参考下文
    2023-05-05
  • Go语言实现汉诺塔算法

    Go语言实现汉诺塔算法

    之前的文章,我们给大家分享了不少汉诺塔算法的实现语言,包括C、c++、java、python等,今天我们就来使用go语言来实现一下,需要的小伙伴来参考下吧。
    2015-03-03

最新评论