详解C/C++如何发送与接收Kafka消息

 更新时间:2024年07月17日 09:53:20   作者:周先生FullStack  
系统之间通信方式很多如:系统之间调用(http/rpc等),异步间接调用如发送消息、公共存储等,算法工程为C/C++工程,本文将介绍如何在C/C++中如何发送与接收Kakfa消息(包含:Kafka的SASL认证方式),并提供了详细的源码和讲解,需要的朋友可以参考下

一、背景

在实际工程中,难免会遇到不通系统之间通信,如何进行系统之间通信呢?(作为一个“全栈工程师”,必须要解决它!)。

系统之间通信方式很多如:系统之间调用(http/rpc等),异步间接调用如发送消息、公共存储等。目前,本人从事的项目中遇到web业务工程(Java)依赖与算法工程(C++) 处理的视频/图片分类与标记结果。两个系统之前数据通信采用了kafka消息方式。

算法工程为C/C++工程,本文将介绍如何在C/C++中如何发送与接收Kakfa消息(包含:Kafka的SASL认证方式),并提供了详细的源码和讲解。(至于Java中如何发送与接收Kakfa消息如有需要,可留言或私聊!)

二、环境依赖安装

# 下载librdkafka
git clone https://github.com/edenhill/librdkafka.git
 
# 编译
cd librdkafka
./configure --prefix=/usr/local
 
# 安装
sudo make install
 
# 验证:查看/usr/local/lib目录下是否有librdkafka文件
ls /usr/local/lib | grep kafka

三、编写kakfa生产者消费者

3.1 生产者

#include <rdkafka.h>     // 包含C API头文件
#include <iostream>
#include <cstring>
#include <cerrno>
  
int main() {
    const char *brokers = "xx.xx.xx.xx:7091"; // Kafka broker地址
    const char *topic_name = "kafka_msg_topic_test";
    const char *payload = "Hello, Kafka from librdkafka!";
    size_t len = strlen(payload);
  
    // 创建配置对象
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    if (!conf) {
        std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        return 1;
    }
  
    // 设置broker地址
    if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, NULL, 0) != RD_KAFKA_CONF_OK) {
        std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_conf_destroy(conf);
        return 1;
    }
  
    // 创建生产者实例
    rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
    if (!rk) {
        std::cerr << "Failed to create producer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_conf_destroy(conf);
        return 1;
    }
  
    // 创建topic句柄(可选,但推荐)
    rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name, NULL);
    if (!rkt) {
        std::cerr << "Failed to create topic handle: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_destroy(rk);
//        rd_kafka_conf_destroy(conf);
        return 1;
    }
  
    // 发送消息
    int32_t partition = RD_KAFKA_PARTITION_UA; // 自动选择分区
    int err = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, const_cast<char *>(payload), len, NULL, 0, NULL);
    if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
        std::cerr << "Failed to produce to topic " << topic_name << ": " << err << std::endl;
    } else {
        std::cout << "Produced " << len << " bytes to topic " << topic_name << std::endl;
    }
  
    // 等待所有消息发送完成(可选,但推荐)
    // 在实际生产代码中,您可能需要更复杂的逻辑来处理消息的发送和确认
    int msgs_sent = 0;
    while (rd_kafka_outq_len(rk) > 0) {
        rd_kafka_poll(rk, 100); // 轮询Kafka队列,直到所有消息都发送出去
        msgs_sent += rd_kafka_outq_len(rk);
    }
  
    // 销毁topic句柄
    rd_kafka_topic_destroy(rkt);
  
    // 销毁生产者实例
    rd_kafka_destroy(rk);
  
    // 销毁配置对象
//    rd_kafka_conf_destroy(conf);
  
    return 0;
}

3.2 消费者

#include <rdkafka.h>
#include <iostream>
#include <cerrno>
#include <cstring>
#include <cstdlib>
  
void error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) {
    // 错误处理回调
    std::cerr << "Kafka error: " << err << ": " << reason << std::endl;
}
  
int main() {
    std::cerr << "start " << std::endl;
    const char *brokers = "xx.xx.xx.xx:7091"; // Kafka broker地址
    const char *group_id = "kafka_msg_topic_test"; // 消费者组ID
    const char *topic_name = "kafka_msg_topic_test"; // Kafka topic名称
    
    // 创建配置对象
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    if (!conf) {
        std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        return 1;
    }
    
    // 设置broker地址
    if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, NULL, 0) != RD_KAFKA_CONF_OK) {
        std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_conf_destroy(conf);
        return 1;
    }
    
    // 设置消费者组ID
    if (rd_kafka_conf_set(conf, "group.id", group_id, NULL, 0) != RD_KAFKA_CONF_OK) {
        std::cerr << "Failed to set group.id: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_conf_destroy(conf);
        return 1;
    }
    
    // 设置错误处理回调(可选)
    rd_kafka_conf_set_error_cb(conf, error_cb);
    
    // 创建消费者实例
    rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0);
    if (!rk) {
        std::cerr << "Failed to create consumer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        return 1;
    }
    
    
    // 创建一个topic分区列表
    rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
    if (!topics) {
        std::cerr << "Failed to create topic partition list: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_destroy(rk);
        return 1;
    }
    
    // 添加topic到分区列表
    if (!rd_kafka_topic_partition_list_add(topics, topic_name, RD_KAFKA_PARTITION_UA)) {
        std::cerr << "Failed to add topic to partition list: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_topic_partition_list_destroy(topics);
        rd_kafka_destroy(rk);
        return 1;
    }
    // 订阅topic
    rd_kafka_resp_err_t err = rd_kafka_subscribe(rk, topics);
    if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
        std::cerr << "Failed to subscribe to topic: " << rd_kafka_err2str(err) << std::endl;
        rd_kafka_topic_partition_list_destroy(topics);
        rd_kafka_destroy(rk);
        return 1;
    }
    
    // 销毁分区列表(订阅后不再需要)
    rd_kafka_topic_partition_list_destroy(topics);
    
    
    // 轮询消息
    while (true) {
        rd_kafka_message_t *rkmessage;
        rkmessage = rd_kafka_consumer_poll(rk, 1000); // 等待1秒以获取消息
        
        if (rkmessage == NULL) {
            // 没有消息或者超时
            continue;
        }
        
        if (rkmessage->err) {
            // 处理错误
            if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
                // 消息流的末尾
                std::cout << "End of partition event" << std::endl;
            } else {
                // 打印错误并退出
                std::cerr << "Kafka consumer error: " << rd_kafka_message_errstr(rkmessage) << std::endl;
                break;
            }
        } else {
            // 处理消息
            std::cout << "Received message at offset " << rkmessage->offset
            << " from partition " << rkmessage->partition
            << " with key \"" << rkmessage->key << "\" and payload size "<< rkmessage->len
            << " value :" <<(char *)rkmessage->payload
            << std::endl;
            
            // 如果需要,可以在这里处理消息内容
            // 例如,使用rkmessage->payload()获取消息内容
            
            // 释放消息
            rd_kafka_message_destroy(rkmessage);
        }
    }
    
    // 清理
    rd_kafka_destroy(rk);
    
    return 0;
}

3.3 编译运行

3.3.1 编译生产者消费者

g++ -o send_kafka SendKakfaMessage.cpp -I/usr/local/include/librdkafka -lrdkafka++ -lrdkafka -lpthread
g++ -o receive_kafka ReceiveKafkaMessage.cpp -I/usr/local/include/librdkafka -lrdkafka++ -lrdkafka -lpthread

3.3.2 运行验证

执行时,若出现错误: error while loading shared libraries: librdkafka++.so.1: cannot open shared object file: No such file or directory

则需要执行下面环境变量配置:

export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH

生产者:发送消息

消费者:接收消息

3.4 SASL认证kakfa

下面是,支持sasl认证的kakka生产者完整代码

#include <rdkafka.h>
#include <iostream>
#include <cstring>
#include <cerrno>
 
int main(int argc, char *argv[]) {
    const char *brokers = "xx.xx.xx.xx:8092"; // Kafka broker地址
    const char *username = "xxx";
    const char *password = "xxx";
    const char *topic_name = "kafka_msg_test_sasl";
    const char *payload = "Hello, Kafka from librdkafka! sasl";
    size_t len = strlen(payload);
     // 初始化配置
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    if (!conf)
    {
        std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        return 1;
    }
    char errstr[512]; // 声明一个足够大的字符数组来存储错误信息
 
    // 设置SASL相关的配置
    if (rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
    {
        std::cerr << "Failed to set security.protocol: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_conf_destroy(conf);
        return 1;
    }
    if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
    {
        std::cerr << "Failed to set sasl.mechanisms: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_conf_destroy(conf);
        return 1;
    }
    if (rd_kafka_conf_set(conf, "sasl.username", username, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
    {
        std::cerr << "Failed to set sasl.username: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_conf_destroy(conf);
        return 1;
    }
    if (rd_kafka_conf_set(conf, "sasl.password", password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
    {
        std::cerr << "Failed to set sasl.password: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_conf_destroy(conf);
        return 1;
    }
 
    if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
    {
        std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_conf_destroy(conf);
        return 1;
    }
    // 检查配置是否设置成功
    if (rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        std::cerr << "Failed to set configuration: " << errstr << std::endl;
        return 1;
    }
 
    // 创建producer实例
    rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if (!rk) {
        std::cerr << "Failed to create new producer: " << errstr << std::endl;
        return 1;
    }
 
    // 创建topic句柄(可选,但推荐)
    rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name, NULL);
    if (!rkt)
    {
        std::cerr << "Failed to create topic handle: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_destroy(rk);
        //        rd_kafka_conf_destroy(conf);
        return 1;
    }
 
    // 发送消息
    int32_t partition = RD_KAFKA_PARTITION_UA; // 自动选择分区
    int err = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, const_cast<char *>(payload), len, NULL, 0, NULL);
    if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
    {
        std::cerr << "Failed to produce to topic " << topic_name << ": " << err << std::endl;
    }
    else
    {
        std::cout << "Produced " << len << " bytes to topic " << topic_name << std::endl;
    }
 
    // 等待所有消息发送完成(可选,但推荐)
    // 在实际生产代码中,您可能需要更复杂的逻辑来处理消息的发送和确认
    int msgs_sent = 0;
    while (rd_kafka_outq_len(rk) > 0)
    {
        rd_kafka_poll(rk, 100); // 轮询Kafka队列,直到所有消息都发送出去
        msgs_sent += rd_kafka_outq_len(rk);
    }
 
    // 销毁topic句柄
    rd_kafka_topic_destroy(rkt);
 
    // 清理资源
    rd_kafka_destroy(rk);
    return 0;
}

在kafka map 管理界面中查看发送效果如下:

3.5 结束语

到此这篇关于详解C/C++如何发送与接收Kafka消息的文章就介绍到这了,更多相关C/C++发送与接收Kafka内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • C++封装IATHOOK类实例

    C++封装IATHOOK类实例

    这篇文章主要介绍了C++封装IATHOOK类的实现方法,对IAT的HOOK实例进行了封装,非常具有实用价值,需要的朋友可以参考下
    2014-10-10
  • C语言简明讲解归并排序的应用

    C语言简明讲解归并排序的应用

    这篇文章主要介绍了 c语言排序之归并排序,归并就是把两个或多个序列合并,文中通过示例代码介绍的非常详细。对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-05-05
  • C语言内存泄露很严重的解决方案

    C语言内存泄露很严重的解决方案

    这篇文章主要介绍了C语言内存泄露很严重的解决方案,预防内存泄漏问题有多种方法,比如加强代码检视、工具检测和内存测试等,下面文章总结内容需要的小伙伴可以参考一下
    2022-05-05
  • C语言 深入探究动态规划之区间DP

    C语言 深入探究动态规划之区间DP

    这几天在做有关dp的题,看到一个石子合并的问题,本来以为是个贪心,后来仔细一想压根不是贪心。贪心算法的思路是每次都取最大的,然而石子合并问题有个限制条件就是每次只能取相邻的,这就决定了它不是个贪心
    2022-04-04
  • C++高级数据结构之二叉查找树

    C++高级数据结构之二叉查找树

    这篇文章主要介绍了C++高级数据结构之二叉查找树,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-05-05
  • C语言实现的顺序表功能完整实例

    C语言实现的顺序表功能完整实例

    这篇文章主要介绍了C语言实现的顺序表功能,结合完整实例形式分析了C语言顺序表的创建、添加、删除、排序、合并等相关操作技巧,需要的朋友可以参考下
    2018-04-04
  • Visual Studio 2019 如何新建 Win32项目的方法步骤

    Visual Studio 2019 如何新建 Win32项目的方法步骤

    这篇文章主要介绍了Visual Studio 2019 如何新建 Win32项目的方法步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-03-03
  • C语言中“不受限制”的字符串函数总结

    C语言中“不受限制”的字符串函数总结

    这篇文章主要给大家总结介绍了C语言中一些“不受限制”的字符串函数,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-03-03
  • C语言中的malloc使用详解

    C语言中的malloc使用详解

    这篇文章主要介绍了C语言中的malloc的使用,包括用其动态申请二维数组等功能,需要的朋友可以参考下
    2015-08-08
  • C++用mysql自带的头文件连接数据库

    C++用mysql自带的头文件连接数据库

    现在正做一个接口,通过不同的连接字符串操作不同的数据库。要用到mysql数据库。通过网上的一些资料和自己的摸索,大致清楚了C++连接mysql的方法。可以通过2种方法实现。第一种方法是利用ADO连接,第二种方法是利用mysql自己的api函数进行连接。今天主要来讲解下使用API
    2016-07-07

最新评论