Laravel下Kafka的使用
<?phpnamespace App\Http\Controllers;use Illuminate\Http\Request;use Kafka\Consumer;use Kafka\ConsumerConfig;use Kafka\Producer;use Kafka\ProducerConfig;//use Kafka;class KafkaController extends Con
·
Laravel:安装Kafka
composer require nmred/kafka-php
生产者
<?php
namespace App\Http\Controllers;
use Illuminate\Http\Request;
use Kafka;
class ProducerController extends Controller
{
//这里是生产者
public function getProducer()
{
$config = \Kafka\ProducerConfig::getInstance();
// Topic的元信息刷新的间隔
$config->setMetadataRefreshIntervalMs(10000);
// 设置broker地址
$config->setMetadataBrokerList('127.0.0.1:9092');
// 设置broker的代理版本
$config->setBrokerVersion('1.0.0');
// 只需leader确认消息
$config->setRequiredAck(1);
// 选择异步
$config->setIsAsyn(false);
// 每500毫秒发送消息
$config->setProduceInterval(500);
// 创建生产者实例
$producer = new \Kafka\Producer();
$producer->send([
[
'topic' => 'test2',
'value' => 'test...message',
],
]);
}
}
消费者
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Kafka;
class ConsumerKafka extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'consumer:kafka';
/**
* The console command description.
*
* @var string
*/
protected $description = '消费kafka';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setGroupId('test2');
$config->setBrokerVersion('1.0.0');
$config->setTopics(['test2']);
$consumer = new \Kafka\Consumer();
$consumer->start(function($topic, $part, $message) {
var_dump($message);
});
}
}
执行命令即可:
php artisan consumer:kafka
更多推荐
所有评论(0)