Swoole使用多进程创建进程池

场景

需要对N多个队列进行数据处理,每个队列一次处理一个数据耗时较长。要尽可能短的时间里处理一个队列数据需要开M多个进程消耗一个队列数据。N个队列需要对应 N * M 个进程处理,但是还需要有一个限流的功能,要求单个队列每分钟最多处理100数据。

思路

使用 Swoole 的多进程 + 进程池的方案进行进程调度处理。

队列1  <---->  进程A(建立进程池)
                  |-- 子进程A1
                  |-- 子进程A2
                  |-- 子进程 ...
队列2  <---->  进程B(建立进程池)
                  |-- 子进程B1
                  |-- 子进程B2

涉及到的知识点:

  • 限流,使用模拟令牌桶算法
  • 利用 Swoole 进程池自动管理工作进程,柔性重启
  • 利用 Redis 的 bRPop 阻塞队列
  • 进程的优雅重启、退出,通过监听系统信号

实操

1.限流实现

使用 redis 模拟实现了每分钟限流:

<?php

// [queueName => workCofing] 可作为整体配置,统一引入
$redisQueue = [
    'queue1'=>['workerNum'=>2, 'rateMin'=>100,], 
    'retry_queue1'=>['workerNum'=>1, 'rateMin'=>1,],
];

$curMin = date('YmdHi');
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

foreach ($redisQueue as $k=>$v){
    $_key = 'rateMin_' . $k . '_' . $curMin;
    $fillList = range(1, $v['rateMin']);
    $redis->lpush($_key, ...$fillList);
    $redis->expire($_key, 60);
}

2.多进程+进程池

利用Swoole进程池自动管理工作进程

创建运行主文件,如:multiSwoolePool.php

<?php

// [queueName => workCofing] 可作为整体配置,统一引入
// retry 前缀为重试队列
// workNum 每个队列对应的工作进程数量
// rateMin 每个队列的消耗速率 单位 分
$config = [
    'queue1'=>['workerNum'=>2, 'rateMin'=>100,], 
    'retry_queue1'=>['workerNum'=>1, 'rateMin'=>1,],
];

$queueLen = count($config);
if(empty($config)) exit('empty queue');


use Swoole\Process;

// 用于保存子进程pid,方便后续进程重启和停止
$processPidFile = 'process_pid.txt';
file_put_contents($processPidFile, '');

foreach ($config as $configQueueName=>$configParams){
    // 启动多个子进程
    $process = new Process(function () use ($configQueueName, $configParams, $processPidFile) {
        echo date('Y-m-d H:i:s') . ' Child #' . getmypid() . " start {$configQueueName}" . PHP_EOL;
        
        file_put_contents($processPidFile, getmypid() . PHP_EOL, FILE_APPEND);
        
        // 子进程启动进程池
        $workerNum = $configParams['workerNum'];
        $pool = new Swoole\Process\Pool($workerNum);
        // $pool->set(['enable_coroutine' => true]); // 是否开启协程
        $pool->on("WorkerStart", function ($pool, $workerId) use ($configQueueName, $configParams) {
            echo date('Y-m-d H:i:s') . " Worker #{$workerId}:{$configQueueName} is started\n";

            try {
                // 使用 include 便于业务重载
                include "logic.php";
            } catch (\Exception $e){
                // 记录错误日志
                echo $e->getMessage() . PHP_EOL;
                exit; // 让出错进程主动退出,进程池会自动拉起新进程(由业务侧保证数据)
            }
        });

        $pool->on("WorkerStop", function ($pool, $workerId) {
            echo date('Y-m-d H:i:s') . " Worker #{$workerId} is stopped\n";
        });
        $pool->start();

        echo date('Y-m-d H:i:s') . ' Child #' . getmypid() . ' exit' . PHP_EOL;
    });
    $process->start();
}

// 监听多进程
for ($i=0; $i<$queueLen; $i++) {
    $status = Process::wait(true);
    echo date('Y-m-d H:i:s') . " Recycled #{$status['pid']}, code={$status['code']}, signal={$status['signal']}" . PHP_EOL;
}
echo date('Y-m-d H:i:s') . ' Parent #' . getmypid() . ' exit' . PHP_EOL;

// Swoole\Process::daemon();

3.业务处理

进程的优雅重启、退出,通过监听系统信号,文件 logic.php

<?php

/**
 * 进程内传入参数
 * $pool, $workerId
 * $configQueueName, $configParams
 * 
 */

$redis = new Redis();
$redis->pconnect('127.0.0.1', 6379);

$logic = new MyBusiness();

// 安装退出进程信号,实现优雅退出
$running = true;
pcntl_signal(SIGTERM, function () use (&$running, $pool, $workerId, $redis) {
    $running = false;
    echo date('Y-m-d H:i:s') . " Worker #{$pool->getProcess($workerId)->pid}-{$workerId} TERM TERM TERM !!!\n";
    // 关闭 redis、mysql 连接等
    $redis->close();
    exit;
});


while ($running) {
    // 响应退出信号防止逻辑丢失
    pcntl_signal_dispatch();
    
    $result = $redis->bRPop($configQueueName, 10); // 10s 超时
    if ( $result == null) continue;

    [$_queueName, $_params] = $result;
    // step 0 队列调度,防止业务逻辑耗时进程异常导致数据丢失
    if (!preg_match('/^retry/', $configQueueName)) $redis->lPush($configQueueName, $_params);


    // step 1 检查限流/分钟(模拟令牌桶算法)
    $rateMinKey = 'rateMin_' . $configQueueName . '_' . date('YmdHi');
    $rateValue = $redis->rPop($rateMinKey);
    if(empty($rateValue)) {
        // 限流时等待下一次任务
        $redis->rPush($configQueueName, $_params);
        $redis->lPop($configQueueName);
        
        // 限流时: redis 操作结束也可响应退出信号
        pcntl_signal_dispatch();

        $rand = mt_rand(2, 10);
        echo date('Y-m-d H:i:s') . " Worker #{$workerId}:{$configQueueName}:RateLimit will sleep {$rand}s\n";
        sleep($rand);
        continue;
    }

    // step 2 处理业务
    echo date('Y-m-d H:i:s') . " Worker #{$workerId}:{$configQueueName} Start deal logic, current TokenBucketValue:{$rateValue}\n";
    try {
        $logic->manage($configQueueName, $_params);
    } catch (\Exception $e) {
        // 业务异常时的处理
        echo $e->getMessage() . PHP_EOL;
    }

    // step 3 清理变量

}


// 业务处理
class MyBusiness {
    public function manage($queueName, $queueParams){
        $params = json_encode($queueParams);
        // 模拟业务处理耗时
        echo date('Y-m-d H:i:s') . " Deal Logic Start:{$queueName}-{$params}==========>>\n";
        
        sleep(2);

        // 模拟异常
        // if ($params%9==0) throw new Exception("Exception #queueName:{$queueName}, Value is {$params}");

        // 业务处理失败,丢弃或推送到重试队列
        $usedMemory = memory_get_usage();
        $usedMemory = round($usedMemory/1024/1024, 3);
        echo date('Y-m-d H:i:s') . " Deal Logic Success #queueName:{$queueName}-params:{$params}-usedMemory:{$usedMemory}Mb==========>>\n";
    }
}

4. 给程序发送重启、停止信号

进程启动时记录下自己的pid,使用系统命令发送信号

# reloadMultiSwoolePool.sh
# -10 SIGUSR1 柔性重启
cat process_pid.txt | xargs kill -SIGUSR1 


# stopMultiSwoolePool.sh
# -15 SIGTERM 柔性停止
cat process_pid.txt | xargs kill -SIGTERM 

参考文章

单进程(Process) | Swoole4 文档

进程池(Process\Pool) | Swoole4 文档

【Swoole系列3.5】进程池与进程管理器 (360doc6.net)

Author: thinkwei

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注