Lumen 隊(duì)列

2021-09-15 14:41 更新

1、簡(jiǎn)介

Lumen隊(duì)列服務(wù)為各種不同的后臺(tái)隊(duì)列提供了統(tǒng)一的API。隊(duì)列允許你推遲耗時(shí)任務(wù)(例如發(fā)送郵件)的執(zhí)行,從而大幅提高web請(qǐng)求速度。

1.1 配置

.env文件的QUEUE_DRIVER選項(xiàng)決定應(yīng)用使用的隊(duì)列“驅(qū)動(dòng)”。

1.2 隊(duì)列驅(qū)動(dòng)預(yù)備知識(shí)

數(shù)據(jù)庫

為了使用database隊(duì)列驅(qū)動(dòng),需要一張數(shù)據(jù)庫表來存放任務(wù),要生成創(chuàng)建該表的遷移,運(yùn)行Artisan命令queue:table,遷移被創(chuàng)建好了之后,使用migrate命令運(yùn)行遷移:

php artisan queue:table
php artisan migrate

其它隊(duì)列依賴

下面是以上列出隊(duì)列驅(qū)動(dòng)需要安裝的依賴:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~3.0
  • IronMQ: iron-io/iron_mq ~2.0
  • Redispredis/predis ~1.0

2、編寫任務(wù)類

2.1  任務(wù)類結(jié)構(gòu)

默認(rèn)情況下,應(yīng)用的所有隊(duì)列任務(wù)都存放在app/Jobs目錄。任務(wù)類非常簡(jiǎn)單,正常情況下只包含一個(gè)當(dāng)隊(duì)列處理該任務(wù)時(shí)被執(zhí)行的handle方法,讓我們看一個(gè)任務(wù)類的例子:

<?php

namespace App\Jobs;

use App\User;
use App\Jobs\Job;
use Illuminate\Contracts\Mail\Mailer;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Bus\SelfHandling;
use Illuminate\Contracts\Queue\ShouldQueue;

class SendReminderEmail extends Job implements SelfHandling, ShouldQueue
{
    use InteractsWithQueue, SerializesModels;

    protected $user;

    /**
     * 創(chuàng)建一個(gè)新的任務(wù)實(shí)例
     *
     * @param  User  $user
     * @return void
     */
    public function __construct(User $user)
    {
        $this->user = $user;
    }

    /**
     * 執(zhí)行任務(wù)
     *
     * @param  Mailer  $mailer
     * @return void
     */
    public function handle(Mailer $mailer)
    {
        $mailer->send('emails.reminder', ['user' => $this->user], function ($m) {
            //
        });

        $this->user->reminders()->create(...);
    }
}

在本例中,注意我們能夠直接將Eloquent模型傳遞到對(duì)列任務(wù)的構(gòu)造函數(shù)中。由于該任務(wù)使用了SerializesModels trait,Eloquent模型將會(huì)在任務(wù)被執(zhí)行是優(yōu)雅地序列化和反序列化。如果你的隊(duì)列任務(wù)在構(gòu)造函數(shù)中接收Eloquent模型,只有模型的主鍵會(huì)被序列化到隊(duì)列,當(dāng)任務(wù)真正被執(zhí)行的時(shí)候,隊(duì)列系統(tǒng)會(huì)自動(dòng)從數(shù)據(jù)庫中獲取整個(gè)模型實(shí)例。這對(duì)應(yīng)用而言是完全透明的,從而避免序列化整個(gè)Eloquent模型實(shí)例引起的問題。

handle方法在任務(wù)被隊(duì)列處理的時(shí)候被調(diào)用,注意我們可以在任務(wù)的handle方法中對(duì)依賴進(jìn)行類型提示。Lumen服務(wù)容器會(huì)自動(dòng)注入這些依賴。

出錯(cuò)

如果任務(wù)被處理的時(shí)候拋出異常,則該任務(wù)將會(huì)被自動(dòng)釋放回隊(duì)列以便再次嘗試執(zhí)行。任務(wù)會(huì)持續(xù)被釋放知道嘗試次數(shù)達(dá)到應(yīng)用允許的最大次數(shù)。最大嘗試次數(shù)通過Artisan任務(wù)queue:listenqueue:work上的--tries開關(guān)來定義。關(guān)于運(yùn)行隊(duì)列監(jiān)聽器的更多信息可以在下面看到。

手動(dòng)釋放任務(wù)

如果你想要手動(dòng)釋放任務(wù),生成的任務(wù)類中自帶的InteractsWithQueue trait提供了釋放隊(duì)列任務(wù)的release方法,該方法接收一個(gè)參數(shù)——同一個(gè)任務(wù)兩次運(yùn)行之間的等待時(shí)間:

public function handle(Mailer $mailer){
    if (condition) {
        $this->release(10);
    }
}

檢查嘗試運(yùn)行次數(shù)

正如上面提到的,如果在任務(wù)處理期間發(fā)生異常,任務(wù)會(huì)自動(dòng)釋放回隊(duì)列中,你可以通過attempts方法來檢查該任務(wù)已經(jīng)嘗試運(yùn)行次數(shù):

public function handle(Mailer $mailer){
    if ($this->attempts() > 3) {
        //
    }
}

3、推送任務(wù)到隊(duì)列

默認(rèn)的Lumen控制器位于app/Http/Controllers/Controller.php并使用了DispatchesJobs trait。該trait提供了一些允許你方便推送任務(wù)到隊(duì)列的方法,例如dispatch方法:

<?php

namespace App\Http\Controllers;

use App\User;
use Illuminate\Http\Request;
use App\Jobs\SendReminderEmail;
use App\Http\Controllers\Controller;

class UserController extends Controller{
    /**
     * 發(fā)送提醒郵件到指定用戶
     *
     * @param  Request  $request
     * @param  int  $id
     * @return Response
     */
    public function sendReminderEmail(Request $request, $id)
    {
        $user = User::findOrFail($id);

        $this->dispatch(new SendReminderEmail($user));
    }
}

為任務(wù)指定隊(duì)列

你還可以指定任務(wù)被發(fā)送到的隊(duì)列。

通過推送任務(wù)到不同隊(duì)列,你可以對(duì)隊(duì)列任務(wù)進(jìn)行“分類”,甚至優(yōu)先考慮分配給多個(gè)隊(duì)列的worker數(shù)目。這并不會(huì)如隊(duì)列配置文件中定義的那樣將任務(wù)推送到不同隊(duì)列“連接”,而只是在單個(gè)連接中發(fā)送給特定隊(duì)列。要指定該隊(duì)列,使用任務(wù)實(shí)例上的onQueue方法,該方法有Lumen自帶的基類App\Jobs\Job提供:

<?php

namespace App\Http\Controllers;

use App\User;
use Illuminate\Http\Request;
use App\Jobs\SendReminderEmail;
use App\Http\Controllers\Controller;

class UserController extends Controller{
    /**
     * 發(fā)送提醒郵件到指定用戶
     *
     * @param  Request  $request
     * @param  int  $id
     * @return Response
     */
    public function sendReminderEmail(Request $request, $id)
    {
        $user = User::findOrFail($id);
        $job = (new SendReminderEmail($user))->onQueue('emails');
        $this->dispatch($job);
    }
}

3.1 延遲任務(wù)

有時(shí)候你可能想要延遲隊(duì)列任務(wù)的執(zhí)行。例如,你可能想要將一個(gè)注冊(cè)15分鐘后給消費(fèi)者發(fā)送提醒郵件的任務(wù)放到隊(duì)列中,可以通過使用任務(wù)類上的delay方法來實(shí)現(xiàn),該方法由Illuminate\Bus\Queueable trait提供:

<?php

namespace App\Http\Controllers;

use App\User;
use Illuminate\Http\Request;
use App\Jobs\SendReminderEmail;
use App\Http\Controllers\Controller;

class UserController extends Controller{
    /**
     * 發(fā)送提醒郵件到指定用戶
     *
     * @param  Request  $request
     * @param  int  $id
     * @return Response
     */
    public function sendReminderEmail(Request $request, $id)
    {
        $user = User::findOrFail($id);
        $job = (new SendReminderEmail($user))->delay(60);
        $this->dispatch($job);
    }
}

在本例中,我們指定任務(wù)在隊(duì)列中開始執(zhí)行前延遲60秒。

注意:Amazon SQS服務(wù)最大延遲時(shí)間是15分鐘。

3.2 從請(qǐng)求中分發(fā)任務(wù)

映射HTTP請(qǐng)求變量到任務(wù)中很常見,Lumen提供了一些幫助函數(shù)讓這種實(shí)現(xiàn)變得簡(jiǎn)單,而不用每次請(qǐng)求時(shí)手動(dòng)執(zhí)行映射。讓我么看一下 DispatchesJobs trait上的dispatchFrom方法。默認(rèn)情況下,該trait包含在Lumen控制器基類中:

<?php

namespace App\Http\Controllers;

use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class CommerceController extends Controller{
    /**
     * 處理指定訂單
     *
     * @param  Request  $request
     * @param  int  $id
     * @return Response
     */
    public function processOrder(Request $request, $id)
    {
        // 處理請(qǐng)求...
        $this->dispatchFrom('App\Jobs\ProcessOrder', $request);
    }
}

該方法檢查給定任務(wù)類的構(gòu)造函數(shù)并從HTTP請(qǐng)求(或者其它ArrayAccess對(duì)象)中解析變量來填充任務(wù)需要的構(gòu)造函數(shù)參數(shù)。所以,如果我們的任務(wù)類在構(gòu)造函數(shù)中接收一個(gè)productId變量,該任務(wù)將會(huì)嘗試從HTTP請(qǐng)求中獲取productId參數(shù)。

你還可以傳遞一個(gè)數(shù)組作為dispatchFrom方法的第三個(gè)參數(shù)。該數(shù)組用于填充所有請(qǐng)求中不存在的構(gòu)造函數(shù)參數(shù):

$this->dispatchFrom('App\Jobs\ProcessOrder', $request, [
    'taxPercentage' => 20,
]);

4、運(yùn)行隊(duì)列監(jiān)聽器

開啟任務(wù)監(jiān)聽器

Lumen包含了一個(gè)Artisan命令用來運(yùn)行推送到隊(duì)列的新任務(wù)。你可以使用queue:listen命令運(yùn)行監(jiān)聽器:

php artisan queue:listen

還可以指定監(jiān)聽器使用哪個(gè)隊(duì)列連接:

php artisan queue:listen connection

注意一旦任務(wù)開始后,將會(huì)持續(xù)運(yùn)行直到手動(dòng)停止。你可以使用一個(gè)過程監(jiān)視器如Supervisor來確保隊(duì)列監(jiān)聽器沒有停止運(yùn)行。

隊(duì)列優(yōu)先級(jí)

你可以傳遞逗號(hào)分隔的隊(duì)列連接列表到listen任務(wù)來設(shè)置隊(duì)列優(yōu)先級(jí):

php artisan queue:listen --queue=high,low

在本例中,high隊(duì)列上的任務(wù)總是在從low隊(duì)列移動(dòng)任務(wù)之前被處理。

指定任務(wù)超時(shí)參數(shù)

你還可以設(shè)置每個(gè)任務(wù)允許運(yùn)行的最大時(shí)間(以秒為單位):

php artisan queue:listen --timeout=60

指定隊(duì)列睡眠時(shí)間

此外,可以指定輪詢新任務(wù)之前的等待時(shí)間(以秒為單位):

php artisan queue:listen --sleep=5

需要注意的是隊(duì)列只會(huì)在隊(duì)列上沒有任務(wù)時(shí)“睡眠”,如果存在多個(gè)有效任務(wù),該隊(duì)列會(huì)持續(xù)運(yùn)行,從不睡眠。

4.1 Supervisor配置

Supervisor為Linux操作系統(tǒng)提供的進(jìn)程監(jiān)視器,將會(huì)在失敗時(shí)自動(dòng)重啟queue:listenqueue:work命令,要在Ubuntu上安裝Supervisor,使用如下命令:

sudo apt-get install supervisor

Supervisor配置文件通常存放在/etc/supervisor/conf.d目錄,在該目錄中,可以創(chuàng)建多個(gè)配置文件指示Supervisor如何監(jiān)視進(jìn)程,例如,讓我們創(chuàng)建一個(gè)開啟并監(jiān)視queue:work進(jìn)程的laravel-worker.conf文件:

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --daemon
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log

在本例中,numprocs指令讓Supervisor運(yùn)行8個(gè)queue:work進(jìn)程并監(jiān)視它們,如果失敗的話自動(dòng)重啟。配置文件創(chuàng)建好了之后,可以使用如下命令更新Supervisor配置并開啟進(jìn)程:

sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*

要了解更多關(guān)于Supervisor的使用和配置,查看Supervisor文檔。此外,還可以使用Lumen Forge從web接口方便地自動(dòng)配置和管理Supervisor配置。

4.2 后臺(tái)隊(duì)列監(jiān)聽器

Artisan命令queue:work包含一個(gè)--daemon選項(xiàng)來強(qiáng)制隊(duì)列worker持續(xù)處理任務(wù)而不必重新啟動(dòng)框架。相較于queue:listen命令該命令對(duì)CPU的使用有明顯降低:

php artisan queue:work connection --daemon
php artisan queue:work connection --daemon --sleep=3
php artisan queue:work connection --daemon --sleep=3 --tries=3

正如你所看到的,queue:work任務(wù)支持大多數(shù)queue:listen中有效的選項(xiàng)。你可以使用php artisan help queue:work任務(wù)來查看所有有效選項(xiàng)。

后臺(tái)隊(duì)列監(jiān)聽器編碼考慮

后臺(tái)隊(duì)列worker在處理每個(gè)任務(wù)時(shí)不重啟框架,因此,你要在任務(wù)完成之前釋放資源,舉個(gè)例子,如果你在使用GD庫操作圖片,那么就在完成時(shí)使用imagedestroy釋放內(nèi)存。

類似的,數(shù)據(jù)庫連接應(yīng)該在后臺(tái)長時(shí)間運(yùn)行完成后斷開,你可以使用DB::reconnect方法確保獲取了一個(gè)新的連接。

4.3 部署后臺(tái)隊(duì)列監(jiān)聽器

由于后臺(tái)隊(duì)列worker是常駐進(jìn)程,不重啟的話不會(huì)應(yīng)用代碼中的更改,所以,最簡(jiǎn)單的部署后臺(tái)隊(duì)列worker的方式是使用部署腳本重啟所有worker,你可以通過在部署腳本中包含如下命令重啟所有worker:

php artisan queue:restart

該命令會(huì)告訴所有隊(duì)列worker在完成當(dāng)前任務(wù)處理后重啟以便沒有任務(wù)被遺漏。

注意:這個(gè)命令依賴于緩存系統(tǒng)重啟進(jìn)度表,默認(rèn)情況下,APC在CLI任務(wù)中無法正常工作,如果你在使用APC,需要在APC配置中添加apc.enable_cli=1。

5、處理失敗任務(wù)

由于事情并不總是按照計(jì)劃發(fā)展,有時(shí)候你的隊(duì)列任務(wù)會(huì)失敗。別擔(dān)心,它發(fā)生在我們大多數(shù)人身上!Lumen包含了一個(gè)方便的方式來指定任務(wù)最大嘗試執(zhí)行次數(shù),任務(wù)執(zhí)行次數(shù)達(dá)到最大限制后,會(huì)被插入到failed_jobs表,失敗任務(wù)的名字可以通過配置文件config/queue.php來配置。

要?jiǎng)?chuàng)建一個(gè)failed_jobs表的遷移,可以使用queue:failed-table命令:

php artisan queue:failed-table

運(yùn)行隊(duì)列監(jiān)聽器的時(shí)候,可以在queue:listen命令上使用--tries開關(guān)來指定任務(wù)最大可嘗試執(zhí)行次數(shù):

php artisan queue:listen connection-name --tries=3

5.1 失敗任務(wù)事件

如果你想要注冊(cè)一個(gè)隊(duì)列任務(wù)失敗時(shí)被調(diào)用的事件,可以使用Queue::failing方法,該事件通過郵件或HipChat通知團(tuán)隊(duì)。舉個(gè)例子,我么可以在Lumen自帶的AppServiceProvider中附件一個(gè)回調(diào)到該事件:

<?php

namespace App\Providers;

use Queue;
use Illuminate\Support\ServiceProvider;

class AppServiceProvider extends ServiceProvider{
    /**
     * 啟動(dòng)應(yīng)用服務(wù)
     *
     * @return void
     */
    public function boot()
    {
        Queue::failing(function ($connection, $job, $data) {
            // Notify team of failing job...
        });
    }

    /**
     * 注冊(cè)服務(wù)提供者
     *
     * @return void
     */
    public function register()
    {
        //
    }
}

任務(wù)類的失敗方法

想要更加細(xì)粒度的控制,可以在隊(duì)列任務(wù)類上直接定義failed方法,從而允許你在失敗發(fā)生時(shí)執(zhí)行指定動(dòng)作:

<?php

namespace App\Jobs;

use App\Jobs\Job;
use Illuminate\Contracts\Mail\Mailer;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Bus\SelfHandling;
use Illuminate\Contracts\Queue\ShouldQueue;

class SendReminderEmail extends Job implements SelfHandling, ShouldQueue
{
    use InteractsWithQueue, SerializesModels;

    /**
     * 執(zhí)行任務(wù)
     *
     * @param  Mailer  $mailer
     * @return void
     */
    public function handle(Mailer $mailer)
    {
        //
    }

    /**
     * 處理失敗任務(wù)
     *
     * @return void
     */
    public function failed()
    {
        // Called when the job is failing...
    }
}

5.2 重試失敗任務(wù)

要查看已插入到failed_jobs數(shù)據(jù)表中的所有失敗任務(wù),可以使用Artisan命令queue:failed

php artisan queue:failed

該命令將會(huì)列出任務(wù)ID,連接,對(duì)列和失敗時(shí)間,任務(wù)ID可用于重試失敗任務(wù),例如,要重試一個(gè)ID為5的失敗任務(wù),要用到下面的命令:

php artisan queue:retry 5

如果你要?jiǎng)h除一個(gè)失敗任務(wù),可以使用queue:forget命令:

php artisan queue:forget 5

要?jiǎng)h除所有失敗任務(wù),可以使用queue:flush命令:

php artisan queue:flush
以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)