php 异步执行耗时任务

php 异步执行耗时任务

php 异步执行耗时任务

正常情况下,PHP执行的都是同步请求,代码自上而下依次执行,但有些场景如发送邮件、执行耗时任务等操作时就不适用于同步请求,只能使用异步处理请求。 最近产品要求项目做自动更新,本地获取远程源项目文件更新打本地,由于文件很多,不可能一直for循环下去 最后考虑使用php异步方式进行更新。


一、利用fsockopen()方法解决PHP异步请求

 /**
    * @description  更新代码
   */
   public function saveCode(){
       try {
           Cache::put('is_success',0,3600);
           /*线上文件*/
           $new_code_size=(new Commons())->v_post('https://XX.XX.com/permissions/newfile',[]);
           Cache::put('new_code_sizes',$new_code_size,3600);
           $new_code_size=json_decode($new_code_size,true);

           /*本地文件*/
           $code_size=CodeSize::get(['position','size','list','file_name','is_file'])->toArray();
           $code_position=array_column($code_size,'position');

           Cache::put('code_position',json_encode($code_position,JSON_UNESCAPED_UNICODE),600);
//           /*key*/
           Cache::put('codekey',uniqid(),3600);

           /*异步执行*/
           $host=$_SERVER["HTTP_HOST"];
           $port=80;
           $path=config('app.url').'/permissions/Code_update';
           $size=50;//每次异步执行50个

           $number=ceil(count($new_code_size)/$size);
           Cache::put('code_max_num',$number,3600);
           Cache::put('code_now_num',0,3600);

           for ($i=0;$i<$number;$i++){
               if(count($new_code_size)){
                   $model=array_slice($new_code_size, 0, $size);
                   $new_code_size=arrays_unset($new_code_size,$size);
               }else{
                   $model=array_slice($new_code_size, 0, count($new_code_size));
                   $new_code_size=arrays_unset($new_code_size,count($new_code_size));
               }
               $this->asyncRequest($host,$port ,$path,$model,['num'=>$i]);//调用异步
           }

           return response(['error' => 200, 'data' => [
               'file_code'=>Cache::get('codekey'),
               'ws'=>config('app.wss').':'.config('app.port')
           ], 'msg' => "正在更新中"]);

       }catch (\Exception $e){
           return response(['message' => $e->getMessage(), 'error' => 412]);
       }
   }
   /**
    * @description 异步
    * @param string $host 主机
    * @param int $port 端口
    * @param string $path 耗时接口
    * @param array $model 耗时数据
    * @param array $param 传递参数
   */
   public static function asyncRequest(string $host,int $port ,string $path,array $model,array $param){
       Cache::put('Code_update'.$param['num'],json_encode($model),600);

       $query = isset($param) ? http_build_query($param) : '';
       \Log::info('参数:'.$query);

       $errno = 0;
       $errstr = '';
       $timeout = 30; //连接超时时间(S)
       $fp = @fsockopen($host, $port, $errno, $errstr, $timeout);

       if (!$fp) {
           \Log::info('连接失败');
           return '连接失败';
       }

       if ($errno || !$fp) {
           \Log::info($errstr);
           return $errstr;
       }
       stream_set_blocking($fp,0); //非阻塞
       stream_set_timeout($fp, 1);//响应超时时间(S)
       $out  = "GET " . $path.'?'.$query . " HTTP/1.1\r\n";
       $out .= "host:" . $host . "\r\n";
       $out .= "content-length:" . strlen($query) . "\r\n";
       $out .= "content-type:application/x-www-form-urlencoded\r\n";
       $out .= "connection:close\r\n\r\n";
       $out .= $query;

       $result = @fputs($fp, $out);

       @fclose($fp);

       \Log::info('结果:'.$result);
       return $result;
   }
   /**
    * 更新耗时接口
   */
   public function Code_update(Request $request){
       $param=$request->all();
       $query = isset($param) ? http_build_query($param) : '';
       \Log::info( '进入-> 参数'.$query);
        $code_update=Cache::get('Code_update'.$param['num']);
       \Log::info( '数据-> 参数'.$code_update);

       $model=json_decode($code_update,true);
       $code_position=json_decode(Cache::get('code_position'),true);

       foreach ($model as $item){
           $key=array_search($item['position'],$code_position);
           if($key==false){
               (new CodeController())->localsaveCode($item,'https://XX.XX.com',false);//耗时更新操作
           }else if($code_position[$key]['size']!=$item['size']){
               (new CodeController())->localsaveCode($item,'https://XX.XX.com',true);//耗时更新操作
           }
       }

       $num=Cache::get('code_now_num')+1;
       \Log::info( '------------num:'.$num.'----code_max_num:'.Cache::get('code_max_num'));

       if($num==Cache::get('code_max_num') ){
           \DB::statement('truncate table lty_code_size');//更新代码量
           $this->code_size();//代码量大小获取更新
           /*更新原代码大小*/
           $new_code_size=json_decode(Cache::get('new_code_sizes'),true);
           foreach ($new_code_size as $item_code ){
               $CodeSize=CodeSize::where('position',$item_code['position'])->first();
               if($CodeSize){
                   $CodeSize->original_size=$item_code['size'];
                   $CodeSize->save();
               }
           }

           try {
               \Log::info( '生成表');
               Artisan::call('migrate');
               (new ImportService())->Cache_message(Cache::get('codekey'),'文件数据已全部更新成功');
               Cache::put('is_success',1,3600);
               /*工作目录在public 不能直接运行 composer update 移动到public下*/
               File::copy(base_path('composer.json'), public_path('composer.json'));
               File::copy(base_path('composer.lock'), public_path('composer.lock'));

               \Log::info( '执行 composer update is_success:'.Cache::get('is_success'));

               exec ("composer update ../",$output,$update);

               \Log::info( '执行composer update ');

               \Log::info( '命令行执行完毕');
           }catch (\Exception $e){
               (new ImportService())->Cache_message(Cache::get('codekey'),'命令执行遇到错误');
               \Log::info( $e->getMessage());
           }
       }else{
           Cache::put('code_now_num',$num,3600);
       }

   }

当前laravel中也有异步Event机制可以用来使用,但是能用不如能懂。这里也说下laravel 的异步处理。

二、分别创建Event 和 Listener

\App\Events\Test::class => [
            \App\Listeners\TestA::class
        ],

app\Events\Test

<?php

namespace App\Events;

use Illuminate\Broadcasting\Channel;
use Illuminate\Queue\SerializesModels;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Broadcasting\PresenceChannel;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;

class Test
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    public $str;

    /**
     * Create a new event instance.
     *
     * @return void
     */
    public function __construct($str)
    {
        $this->str = $str;
    }

    /**
     * Get the channels the event should broadcast on.
     *
     * @return \Illuminate\Broadcasting\Channel|array
     */
    public function broadcastOn()
    {
        return new PrivateChannel('channel-name');
    }
}

App\Listeners\TestA

<?php

namespace App\Listeners;

use App\Events\Test as TestEvent;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;

class TestA implements ShouldQueue
{

    /**
     * Create the event listener.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }

    /**
     * Handle the event.
     *
     * @param  TestEvent  $event
     * @return void
     */
    public function handle(TestEvent $event)
    {
        logger()->channel('abc')->info('I got msg:'.$event->str.' at:'.date('Y-m-d H:i:s'));
        sleep(5);
    }
}

如上所示,我们的 监听器实现了 ShouldQueue,使用了事件队列。同时,我们把 config/queue.php 文件中的


<?php

return [

    /*
    |--------------------------------------------------------------------------
    | Default Queue Connection Name
    |--------------------------------------------------------------------------
    |
    | Laravel's queue API supports an assortment of back-ends via a single
    | API, giving you convenient access to each back-end using the same
    | syntax for every one. Here you may define a default connection.
    |
    */

//    'default' => env('QUEUE_CONNECTION', 'redis'),
    'default' => 'redis',
    .
    .
    .

];

然后清除配置缓存,把队列跑起来

php artisan config:clear

php artisan queue:work

写个接口,进行事件分发

<?php

namespace App\Http\ApiControllers;

use App\Events\Test as EventTest;

class TestController extends Controller
{

    public function test()
    {
         event(new EventTest('123'));
         var_dump('456 at:'.date('Y-m-d H:i:s'));exit;
    }

}

可以看到,我在事件中传入了一个字符串,然后在监听器中,把字符串写入日志并带上时间戳,然后sleep(5),然后在本地方法中把当前时间戳打印出来。

这样,当我调用接口时,如果是一个同步事件,则 会先在日志中打印一个 时间戳A,然后过5秒,接口中再打印一个时间戳B,这两个时间戳相隔5秒。

如果是一个异步事件,则 日志的打印时间戳A后的 sleep(5) 并不会 延迟 接口日志中打印的时间戳B,也就是说,如果队列不拥挤的话,时间戳B不会晚于时间戳5秒。

我们打开日志,然后调用接口,发现

接口返回:

string(26) "456 at:2020-03-17 12:14:42"
日志记录:

[2020-03-17 12:14:45] dev.INFO: I got msg:123 at:2020-03-17 12:14:45
队列记录:

[2020-03-17 12:14:45][JWStHEjScMFNiOqUwRPL0syVko5d2v16] Processing: App\Listeners\TestA
[2020-03-17 12:14:50][JWStHEjScMFNiOqUwRPL0syVko5d2v16] Processed:  App\Listeners\TestA

可以看到,事件在 12:14:45 被监听到进入队列,而 接口却在 12:14:42 已经执行完了,也就是说 事件被分发后,request 进程就不阻塞等待了,直接执行接下来的代码。

如此,我们实现了一个简单的 事件异步队列。

若是对于一些简单的、非耗时的、需要立即执行的任务,通过这种方式处理更简单一些: 调度器中新增了一个 dispatchAfterResponse() 方法,顾名思义,该方法用于在响应发送给客户端之后执行一个任务,对应的使用场景如下:

该方法用于在响应发送后、连接关闭前执行某个任务,其实现原理有点类似终止中间件,会在应用程序处理请求完成之前注册一个可运行的终止回调到应用。

猜你喜欢