php 安装 kafka扩展,laravel使用kafka

php 安装 kafka扩展,laravel使用kafka

php没有官方的kafka扩展,目前有一个比较火的kafka扩展,这个扩展是用C写的,直接通过pecl安装。地址:https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/book.rdkafka.html 安装这个插件有个前提条件,必须先安装 librdkafka

git clone https://github.com/edenhill/librdkafka.git
 ./configure
 make
make install

安装好后,直接用pecl安装即可。

pecl install rdkafka
//然后在php.ini写入
extension=rdkafka.so 

file 源码安装:

cd /usr/local/src/
 git clone https://github.com/arnaud-lb/php-rdkafka.git
 cd php-rdkafka
 /usr/local/php/bin/phpize
 ./configure --with-php-config=/usr/local/php/bin/php-config
 make all -j 5
 make install

//然后在php.ini写入
extension=rdkafka.so

composer安装依赖

composer require nmred/kafka-php

生产者:

<?php

namespace App\Http\Controllers;

use Illuminate\Http\Request;
use Kafka;

class ProducerController extends Controller
{
    //这里是生产者
    public function getProducer()
    {
        $config = \Kafka\ProducerConfig::getInstance();
        //  Topic的元信息刷新的间隔
        $config->setMetadataRefreshIntervalMs(10000);
        //  设置broker地址
        $config->setMetadataBrokerList('127.0.0.1:9092');
        //  设置broker的代理版本
        $config->setBrokerVersion('1.0.0');
        //  只需leader确认消息
        $config->setRequiredAck(1);
        //  选择异步
        $config->setIsAsyn(false);
        //  每500毫秒发送消息
        $config->setProduceInterval(500);
        //  创建生产者实例
        $producer = new \Kafka\Producer();
            $producer->send([
                [
                    'topic' => 'test2',
                    'value' => 'test...message',
                ],
            ]);
    }
}

消费者:

<?php

namespace App\Console\Commands;

use Illuminate\Console\Command;
use Kafka;

class ConsumerKafka extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'consumer:kafka';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = '消费kafka';

    /**
     * Create a new command instance.
     *
     * @return void
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return int
     */
    public function handle()
    {
        $config = \Kafka\ConsumerConfig::getInstance();
        $config->setMetadataRefreshIntervalMs(10000);
        $config->setMetadataBrokerList('127.0.0.1:9092');
        $config->setGroupId('test2');
        $config->setBrokerVersion('1.0.0');
        $config->setTopics(['test2']);
        $consumer = new \Kafka\Consumer();
        $consumer->start(function($topic, $part, $message) {
            var_dump($message);
        });
    }
}

执行命令监听:php artisan consumer:kafka在请求getProducer方法: file 或者执行命令:sh kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test2 --from-beginning file

存到数据库中: file 生产者:

 public function getProducer(Request $request)
    {
            $send=$request->all();

            $config = \Kafka\ProducerConfig::getInstance();
            //  Topic的元信息刷新的间隔
            $config->setMetadataRefreshIntervalMs(10000);
            //  设置broker地址
            $config->setMetadataBrokerList('127.0.0.1:9092');
            //  设置broker的代理版本
            $config->setBrokerVersion('1.0.0');
            //  只需leader确认消息
            $config->setRequiredAck(1);
            //  选择异步
            $config->setIsAsyn(false);
            //  每500毫秒发送消息
            $config->setProduceInterval(500);
            //  创建生产者实例
            $producer = new \Kafka\Producer();
            $producer->send($send['send']);
            /*$producer->send([
                    [
                            'topic' => 'test2',
                            'value' => 'test2...message',
                    ],
            ]);*/
    }

消费者:

        public function handle()
    {
        try {
            $config = \Kafka\ConsumerConfig::getInstance();
            $config->setMetadataRefreshIntervalMs(10000);
            $config->setMetadataBrokerList('139.159.189.151:9092');
            $config->setGroupId('test2');
            $config->setBrokerVersion('1.0.0');
            $config->setTopics(['a1','a2','a3']);
            $consumer = new \Kafka\Consumer();
            $consumer->start(function($topic, $part, $message) {
                                $KafkaMessage=new KafkaMessage();
                                $KafkaMessage->topic=$topic;
                                $KafkaMessage->part=$part;
                                $KafkaMessage->msgall=$message;
                                $KafkaMessage->message=$message['message']['value'];
                                $KafkaMessage->save();
                                echo 'topic:'.$KafkaMessage->topic.' 分区:'.$KafkaMessage->part.
                                        ' 发送消息:'.$KafkaMessage->message;
                 //var_dump($topic, $part,$message);//发送到对应方法
            });
        }catch (\Exception $exception){
         echo $exception->getMessage();
        }
    }

filefile

猜你喜欢