本篇文章给大家介绍PHP实现生产者与消费者,希望对需要的朋友有所帮助!

前言

PHP中使用Kafka需要RdKafka扩展,而RdKafka依赖于librdkafka,所以这两个我们都需要安装,具体安装方法自行百度,本篇不做说明了。

生产者(测试)

创建消费者需要步骤:

  • 生产者配置参数
  • 创建生产者实例
  • 创建主题实例(依赖生产者)
  • 生产主题消息
  • 推送消息

具体代码如下:

        $conf = new \RdKafka\Conf();
        // 绑定服务节点
        $conf->set('metadata.broker.list', '127.0.0.1:32772');

        // 创建生产者
        $kafka = new \RdKafka\Producer($conf);

        // 创建主题实例
        $topic = $kafka->newTopic('p1r1');
        // 生产主题数据,此时消息在缓冲区中,并没有真正被推送
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Message');
        // 阻塞时间(毫秒), 0为非阻塞
        $kafka->poll(0); 

        // 推送消息,如果不调用此函数,消息不会被发送且会丢失
        $result = $kafka->flush(5000);

        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException('Was unable to flush, messages might be lost!');
        }

消费者

创建一个消费者需要几个步骤:

  • 消费者配置参数
  • 应用配置参数创建消费者实例
  • 订阅对应主题
  • 拉取数据
  • 提交位移

具体代码如下:

        $conf = new \RdKafka\Conf();
        // 绑定消费者组
        $conf->set('group.id', 'ceshi');
        // 绑定服务节点,多个用,分隔
        $conf->set('metadata.broker.list', '127.0.0.1:32787');
        // 设置自动提交为false
        $conf->set('enable.auto.commit', 'false');
        // 设置当前消费者拉取数据时的偏移量, 可选参数:
        // earliest: 如果消费者组是新创建的,从头开始消费,否则从消费者组当前消费位移开始。
        // latest:如果消费者组是新创建的,从最新偏移量开始,否则从消费者组当前消费位移开始。
        $conf->set('auto.offset.reset', 'earliest');

        // 创建消费者实例
        $consumer = new \RdKafka\KafkaConsumer($conf);
        // 消费者订阅主题,数组形式
        $consumer->subscribe(['topic1','topic2']);
        while (true) {
            // 消费数据,阻塞5秒(5秒内有数据就消费,没有数据等待5秒进入下一轮循环)
            $message = $consumer->consume(5000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    // 业务逻辑
                    var_dump($message);

                    // 提交位移
                    $consumer->commit($message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "No more messages; will wait for more\n";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    echo "Timed out\n";
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        }
        // 关闭消费者(一般用在脚本中,不需要关闭)
        $conumser->close();

只消费指定分区中的数据:

    // 对消费者指定分区,注意此方式不能与subscribe一同使用
    $consumer->assign([
        new RdKafka\TopicPartition("topic", 0),
        new RdKafka\TopicPartition("topic", 1),
    ]);

php入门到就业线上直播课:立即学习
全程直播 + 实战授课 + 边学 + 边练 + 边辅导

以上就是详解PHP实现生产者与消费者(Kafka应用)的详细内容,更多请关注php中文网其它相关文章!

声明:本文转载于:learnku,如有侵犯,请联系admin@php.cn删除

  • 相关标签:laravel PHP
  • 程序员必备接口测试调试工具:点击使用

    Apipost = Postman + Swagger + Mock + Jmeter

    Api设计、调试、文档、自动化测试工具

    网页生成APP,用做网站的技术去做APP:立即创建

    手机网站开发APP、自助封装APP、200+原生模块、2000+映射JS接口按需打包

    • 上一篇:浅谈PHP运行Python脚本的方法
    • 下一篇:PHP中rename()的奇妙百科

    相关文章

    相关视频


    • 详解PHP7语言基础
    • PHP如何获取指定网址的header头信息及隐藏关...
    • 宝塔面板下怎么安装Mosquitto-php扩展
    • 如何利用php获取ip地址
    • 解决ThinkPHP里无法输出图片问题(关于设置响...
    • 详解PHP实现生产者与消费者(Kafka应用)
    • 安装php8
    • php数组循环
    • 条件判断、PHP8新特性
    • 函数、PHP8新特性

    视频教程分类

    • php视频教程
    • html视频教程
    • css视频教程
    • JS视频教程
    • jQuery视频教程
    • mysql视频教程
    • Linux视频教程
    • Python视频教程
    • Laravel视频教程
    • Vue视频教程

    专题

    详解PHP实现生产者与消费者(Kafka应用)