thinkphp 6 消息队列

摘要:
==False){echo“执行成功”;}否则{echo“执行失败”;}//phpthinkwqueue:listen--queuecreateOrderJob执行队列//nohupphpthinkwqueue:listen--队列createOrderJob&未作为守护进程执行}/**multitask*/publicfunctionmultiTask(){$taskType=$_GET['taskType'];开关{case‘taskA':$jobHandlerClassName='appjobMultiTask@taskA ';$ jobDataArr=['a'=˃'1'];$jobQueueName=“multiTaskJobQueue”;打破case'askB':$jobHandlerClassName='appjobMultiTask@taskB ';$ jobDataArr=['b'=˃'2'];$jobQueueName=“multiTaskJobQueue”;打破默认值:break;}$isPushed=队列::push;如果($isPushed!“;}}}})4.创建消费单任务类Job1{publicfunctionfire{//业务处理代码。具体来说,$isJobDone=$this-˃jobDone;//执行成功。如果{$job-˃Delete();print;}否则{$job-˃release;//$delay是一个延迟计划。它表示任务在打印之前延迟了3秒;}//如果{print(“作业已重试超过3次!”)$作业-˃删除();}}Publicfunctionfailed{//…任务达到最大重试次数后,它将失败}privatefunctionjobDone{Log::write;returntrue;}多任务{publicfunctiontaskA{$isJobDone=$this-˃_doTaskA;如果{$job-˃delete();打印;}否则{$job-˃release;print;}如果{print(“作业已重试3次以上!

1.安装think-queue

composer require topthink/think-queue

2.配置消息队列,将config/queue.php将’default’ => ‘sync’改为’default’ => ‘redis’,使用Redis驱动

thinkphp 6 消息队列第1张

如选择database,需创建表

CREATE TABLE `prefix_jobs` ( `id` int(11) NOT NULL AUTO_INCREMENT, `queue` varchar(255) NOT NULL, `payload` longtext NOT NULL, `attempts` tinyint(3) unsigned NOT NULL, `reserve_time` int(10) unsigned DEFAULT NULL, `available_time` int(10) unsigned NOT NULL, `create_time` int(10) unsigned NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3.创建生产者

class Index extends BaseController
{
    
    /**
     * 单任务
     */
    public function singleTask()
    {
        //当前任务将由哪个类来负责处理
        $jobHandlerClassName = 'appjobJob1';
        //业务数据 对象需要手动转序列化
        $jobData = ['ts' => time()];
        //队列名称
        $jobQueueName = "createOrderJob";
        //入队列,later延时发送,单位秒。push立即发送
        $isPushed = Queue::later(2, $jobHandlerClassName, $jobData,$jobQueueName);
        //$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
        // database 驱动时,返回值为 1|false  ;   redis 驱动时,返回值为 随机字符串|false
        if( $isPushed !== false ){
            echo '执行成功';
        }else{
            echo '执行失败';
        }
        //php think queue:listen --queue createOrderJob  执行队列
        //nohup php think queue:listen --queue createOrderJob &  不以守护进程执行
    }
 
    /**
     * 多任务
     */
    public function multiTask(){
        $taskType = $_GET['taskType'];
        switch ($taskType) {
            case 'taskA':
                $jobHandlerClassName  = 'appjobMultiTask@taskA';
                $jobDataArr = ['a'   => '1'];
                $jobQueueName = "multiTaskJobQueue";
                break;
            case 'taskB':
                $jobHandlerClassName  = 'appjobMultiTask@taskB';
                $jobDataArr = ['b'   => '2'];
                $jobQueueName = "multiTaskJobQueue";
                break;
            default:
                break;
        }
 
        $isPushed = Queue::push($jobHandlerClassName, $jobDataArr, $jobQueueName);
        if ($isPushed !== false) {
            echo("the $taskType of MultiTask Job has been Pushed to ".$jobQueueName ."<br>");
        }else{
            echo "push a new $taskType of MultiTask Job Failed!";
        }
    }
 
}

4.创建消费者

单任务

class Job1
{
    public function fire(Job $job, $data)
    {
        //业务处理代码,具体不贴出来了
        $isJobDone = $this->jobDone($data);
        //执行成功删除
        if($isJobDone){
            $job->delete();
            print("任务已经被执行成功并且删除");
        }else{
            $job->release(3); //$delay为延迟时间 表示该任务延迟3秒后再执行
            print("任务3s后再次被执行");
        }
        //通过这个方法可以检查任务重试了几次
        if ($job->attempts() > 3) {
            print("Job has been retried more than 3 times!");
            $job->delete();
        }
    }
 
    public function failed($data)
    {
        // ...任务达到最大重试次数后,失败了
    }
 
    private function jobDone($data){
        Log::write('这是数据 ' . json_encode($data));
        return true;
    }

多任务

class MultiTask{
    public function taskA(Job $job,$data){
 
        $isJobDone = $this->_doTaskA($data);
 
        if ($isJobDone) {
            $job->delete();
            print("Info: TaskA of Job MultiTask has been done and deleted"."
");
        }else{
            $job->release(3);
            print("任务3s后再次被执行");
        }
        if ($job->attempts() > 3) {
            print("Job has been retried more than 3 times!");
            $job->delete();
        }
    }
 
    public function taskB(Job $job,$data){
 
        $isJobDone = $this->_doTaskB($data);
 
        if ($isJobDone) {
            $job->delete();
            print("Info: TaskB of Job MultiTask has been done and deleted"."
");
        }else{
            $job->release(3);
            print("任务3s后再次被执行");
        }
        if ($job->attempts() > 3) {
            print("Job has been retried more than 3 times!");
            $job->delete();
        }
    }
 
    private function _doTaskA($data) {
        print("Info: doing TaskA of Job MultiTask "."
");
        return true;
    }
 
    private function _doTaskB($data) {
        print("Info: doing TaskB of Job MultiTask "."
");
        return true;
    }
}

5.执行

php think queue:listen --queue createOrderJob   //(队列名)

 --daemon  //可后台运行,具体看手册

//$job->delete();   删除任务
//$job->attempts();  查看任务执行次数
// 注意:执行完任务后必须删除任务

在database 模式下,2.7.1 和 2.7.2 中的重发逻辑是先删除原来的任务,然后插入一个新的任务。2.7.3 中的重发时机是直接更新原任务。

而在redis 模式下,3种重发都是先删除再插入。

不管是哪种重发方式,重发之后,任务的已尝试次数会在原来的基础上 +1 。

此外,消费者类中需要注意,如果 fire() 方法中可能抛出异常,那么如果不需要自动重发的话, 请在抛出异常之前将任务删除 $job->delete() ,以免产生bug。 如果需要自动重发的话,请直接抛出异常,不要在 fire() 方法中又手动使用 $job->release() , 这样会导致该任务被重发两次,产生两个一样的新任务。

 可配合supervisor使用,保证进程常驻

转: https://www.freesion.com/article/81081354933/

https://www.sxxblog.com/index/detail/archive/18.html

https://blog.csdn.net/chengzheng5879/article/details/100913455

免责声明:文章转载自《thinkphp 6 消息队列》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇给X9DRL-iF双路服务器主板刷BIOSFastAPI框架快速构建高性能的api服务下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

MFC常见问题解惑

MFC类的分类 1Root: CObject :CObject2Application Architecture Classes:CWinApp/CFrameWnd/... 3Window, Dialog, and Control Classes:CWnd/CDialog/...4Drawing and Printing Classes :CGdiObje...

kafka学习笔记01-kafka简介和架构介绍

kafka介绍 kafka 最开始是 Linkedin 用来处理海量的日志信息,后来 linkedin 于 2010 年贡献给了 Apache 基金会并成为了顶级项目。 后来开发 kafka 的一些人出来创立了一家公司 confluent,专门从事 kafka 的开发维护和在它之上提供各种服务。 现在 kafka 把它定义为一个分布式数据流处理平台。 ka...

RabbitMQ 运转流程

生产者发送消息 1、生产者连接到 RabbitMQ Broker,建立一个连接(Connection),开启一个信道(Channel) 2、生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等 3、生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等 4、生产者通过路由键将交换器和队列绑定起来 5、生产者发送消息至 Ra...

RabbitMQ、Kafka、RocketMQ的优劣势

今天我们一起来探讨:  全量的消息队列究竟有哪些?  Kafka、RocketMQ、RabbitMQ的优劣势比较  以及消息队列的选型 最全MQ消息队列有哪些 那么目前在业界有哪些比较知名的消息引擎呢?如下图所示 这里面几乎完全列举了当下比较知名的消息引擎,包括:  ZeroMQ  推特的Distributedlog  ActiveMQ:Apach...

Kafka — 高吞吐量的分布式发布订阅消息系统【转】

1.Kafka独特设计在什么地方?2.Kafka如何搭建及创建topic、发送消息、消费消息?3.如何书写Kafka程序?4.数据传输的事务定义有哪三种?5.Kafka判断一个节点是否活着有哪两个条件?6.producer是否直接将数据发送到broker的leader(主节点)?7.Kafa consumer是否可以消费指定分区消息?8.Kafka消息是采...

.NET 环境中使用RabbitMQ(转)

出处:http://www.cnblogs.com/yangecnu/p/4227535.html 在企业应用系统领域,会面对不同系统之间的通信、集成与整合,尤其当面临异构系统时,这种分布式的调用与通信变得越发重要。其次,系统中一般会有很多对实时性要求不高的但是执行起来比较较耗时的地方,比如发送短信,邮件提醒,更新文章阅读计数,记录用户操作日志等等,如果...