Laravel:安装Kafka

composer require nmred/kafka-php

生产者

<?php

namespace App\Http\Controllers;

use Illuminate\Http\Request;
use Kafka;

class ProducerController extends Controller
{
    //这里是生产者
    public function getProducer()
    {
        $config = \Kafka\ProducerConfig::getInstance();
        //  Topic的元信息刷新的间隔
        $config->setMetadataRefreshIntervalMs(10000);
        //  设置broker地址
        $config->setMetadataBrokerList('127.0.0.1:9092');
        //  设置broker的代理版本
        $config->setBrokerVersion('1.0.0');
        //  只需leader确认消息
        $config->setRequiredAck(1);
        //  选择异步
        $config->setIsAsyn(false);
        //  每500毫秒发送消息
        $config->setProduceInterval(500);
        //  创建生产者实例
        $producer = new \Kafka\Producer();
            $producer->send([
                [
                    'topic' => 'test2',
                    'value' => 'test...message',
                ],
            ]);
    }
}


消费者

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Kafka;

class ConsumerKafka extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'consumer:kafka';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = '消费kafka';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return int
     */
    public function handle()
    {
        $config = \Kafka\ConsumerConfig::getInstance();
        $config->setMetadataRefreshIntervalMs(10000);
        $config->setMetadataBrokerList('127.0.0.1:9092');
        $config->setGroupId('test2');
        $config->setBrokerVersion('1.0.0');
        $config->setTopics(['test2']);
        $consumer = new \Kafka\Consumer();
        $consumer->start(function($topic, $part, $message) {
            var_dump($message);
        });
    }
}

执行命令即可:

php artisan consumer:kafka

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐