Laravel中Kafka的使用详解

 更新时间:2021年03月22日 11:19:20   作者:李岚加洛斯  
这篇文章主要介绍了Laravel中Kafka的使用详解,kafka是一个分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力,有对于消息队列感兴趣的同学可以参考下

本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类.

以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php

<?php
namespace App\Tools;
 
use Illuminate\Config\Repository;
 
use Illuminate\Support\Facades\DB;
use Monolog\Logger;
use Monolog\Handler\StreamHandler;
 
use Illuminate\Http\Request;
 
class Kafka
{
  public $broker_list = '127.0.0.1';//配置kafka,可以用逗号隔开多个kafka
  public $topic = 'test';//管道名称
  public $partition = 0;
 
  protected $producer = null;
  protected $consumer = null;
 
  public function __construct()
  {
    if (empty($this->broker_list)) {
      throw new InvalidConfigException("broker not config");
    }
    $rk = new \RdKafka\Producer();
    if (empty($rk)) {
      throw new InvalidConfigException("producer error");
    }
    $rk->setLogLevel(LOG_DEBUG);
    if (!$rk->addBrokers($this->broker_list)) {
      throw new InvalidConfigException("producer error");
    }
    $this->producer = $rk;
  }
 
  /**
   * 生产者
   * @param array $messages
   * @return mixed
   */
  public function send($messages = [],$topic)
  {
    $topic = $this->producer->newTopic($topic);
    return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));
  }
 
  /**
   * 消费者
   */
  public function consumer($object, $callback){
    $conf = new \RdKafka\Conf();
    $conf->set('group.id', 0);
    $conf->set('metadata.broker.list', $this->broker_list);
 
    $topicConf = new \RdKafka\TopicConf();
    $topicConf->set('auto.offset.reset', 'smallest');
 
    $conf->setDefaultTopicConf($topicConf);
 
    $consumer = new \RdKafka\KafkaConsumer($conf);
 
    $consumer->subscribe([$this->topic]);
 
    echo "waiting for messages.....\n";
    while(true) {
      $message = $consumer->consume(120*1000);
      switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
          echo "message payload....";
          $object->$callback($message->payload);
          break;
      }
      sleep(1);
    }
  }
}
?>

在控制器中如何使用:

首先再头部导入这个类:use App\Tools\Kafka;

下面是使用生产者实例:

public function test(){
 
   $topic = 'tool';//输入使用管道名称
   $data['shop_id'] = 58;
   $data['bar_code']=586;
   $data['goods_num'] = 1;
   $data['goods_unit'] = '个';
 
$Kafka = new Kafka();
$Error_Msg = $Kafka->send($data,$topic);//传入数组会自动转换json
var_dump($Error_Msg);
 
 
  }

下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:

<?php
 
$conf = new RdKafka\Conf();
 
$conf->set('group.id', 'myConsumerGroup');
 
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("localhost:9092");
 
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', sys_get_temp_dir());
$topicConf->set('auto.offset.reset', 'smallest');
 
$topic = $rk->newTopic("tool", $topicConf);//读取的管道
 
// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
 
while (true) {
  $message = $topic->consume(0, 120*10000);
  switch ($message->err) {
    case RD_KAFKA_RESP_ERR_NO_ERROR:
    //没有错误打印信息
      $message = json_decode(json_encode($message),true);
      $data = json_decode($message['payload'],true);
      var_dump($data);
      break;
    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
      echo "等待接收信息\n";
      break;
    case RD_KAFKA_RESP_ERR__TIMED_OUT:
      echo "超时\n";
      break;
    default:
      throw new \Exception($message->errstr(), $message->err);
      break;
  }
 sleep(1);
}
 
?>

到此这篇关于Laravel中Kafka的使用详解的文章就介绍到这了,更多相关Laravel中Kafka内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • PHP三种方式实现链式操作详解

    PHP三种方式实现链式操作详解

    在php中有很多字符串函数,本篇文章主要介绍了PHP三种方式实现链式操作详解,具有一定的参考价值,有兴趣的可以了解一下。
    2017-01-01
  • ThinkPHP中U方法的使用浅析

    ThinkPHP中U方法的使用浅析

    这篇文章主要介绍了ThinkPHP中U方法的使用,需要的朋友可以参考下
    2014-06-06
  • php根据操作系统转换文件名大小写的方法

    php根据操作系统转换文件名大小写的方法

    这篇文章主要介绍了php根据操作系统转换文件名大小写的方法,需要的朋友可以参考下
    2014-02-02
  • php+layui数据表格实现数据分页渲染代码

    php+layui数据表格实现数据分页渲染代码

    今天小编就为大家分享一篇php+layui数据表格实现数据分页渲染代码,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2019-10-10
  • PHP sdk文档处理常用代码示例解析

    PHP sdk文档处理常用代码示例解析

    这篇文章主要介绍了PHP sdk文档处理常用代码示例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-12-12
  • 利用PHPStorm如何开发Laravel应用详解

    利用PHPStorm如何开发Laravel应用详解

    这篇文章主要给大家介绍了关于利用PHPStorm如何开发Laravel应用的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面跟着小编来一起学习学习下吧。
    2017-08-08
  • php遍历目录与文件夹的多种方法详解

    php遍历目录与文件夹的多种方法详解

    介绍几个php遍历目录的方法,可以遍历目录及目录中的文件,供大家参考
    2013-11-11
  • 最新版本PHP 7 vs HHVM 多角度比较

    最新版本PHP 7 vs HHVM 多角度比较

    PHP 7 是 PHP 社区对 HHVM 的回应。PHP 7 发布的预览版本号称比之前的 PHP 5 的性能要提升100%。不过,PHP 还有一个竞争对手 HHVM (HipHop Virtual Machine) 一个运行 PHP 代码的虚拟工具。二者直接的比较正在升温,那么让我们来看一下他们直接的性能对比吧
    2016-02-02
  • php IIS日志分析搜索引擎爬虫记录程序

    php IIS日志分析搜索引擎爬虫记录程序

    由于最近比较忙,代码写得不怎么规范,界面也没有怎么美化,大家先用着吧,以后增加新功能会第一时间发布给大家!
    2008-08-08
  • Yii框架参数化查询中IN查询只能查询一个的解决方法

    Yii框架参数化查询中IN查询只能查询一个的解决方法

    这篇文章主要介绍了Yii框架参数化查询中IN查询只能查询一个的解决方法,结合实例形式分析了Yii框架中IN查询只能查一个的原因及FIND_IN_SET函数相关功能与使用技巧,需要的朋友可以参考下
    2017-05-05

最新评论