129. 队列queue

   在系统运行期间,资源紧缺的情况下,应当优先满足紧要的事务,而不是那么急的事情可以稍后处理,比如常见的访问分析在时间上通常就可以延后一点,那么这些不急的事务如何稍后处理呢?可以将完成这些事务所必须的数据先存放起来,形成一个队,每件事务就是队中的一个节点,在系统闲时再从队中逐个取出事务进行处理,这个“队”就是本篇所讲的主题“队列queue”。

队列除了可以让不急的事务稍后处理外,还可以把事务分派到其他机器上处理,这样就实现了并行,从而提高了整体处理能力,因此在大型系统中,“队列”是非常有必要和常见的。

从不同角度可以将队列分类,按事务处理是否有顺序,可以分为有序队列和无序队列,按处理时能否容许遗漏部分事务可分为可信队列和不可信队列等,关于队列有一些术语或概念需要明白。

队列术语:
queue(队列):

这里云客将其定义为:可将一些事务稍后处理或分派到其他系统上处理的一种机制。也有资料说队列是只允许在一端进行插入操作,而在另一端进行删除操作的线性表。不同角度有不同解释,重要的是读者需要有多角度的理解和掌握,“queue”翻译为“队列”而不是“列队”,源于后者是动词,前者是名词。

item(队列条目):
队列中的条目,对应着一件事务,通常会包含事务所需的数据、条目本身的建立时间、条目id

producers(生产者):
队列事务的生产者,也可以说是队列条目的生产者

consumer(消费者):
队列中事务的处理者,也可以说是队列条目的消化者,通常而言采用术语“消费者”,不论是处理者、消化者、消费者,读者须明白她们的含义都一样,不同语境中可能采用不同的词,drupal中使用了“工作者worker”一词,见下文

Reliable(可信):
用来描述可信队列,这种队列保证每个条目至少被处理一次,且保证条目顺序,但顺序是否有逻辑上的严格性要视不同实现而定,见下文

non-reliable(非可信):
用以描述不可信队列,这种队列尽最大努力保证条目顺序和处理,但不担保,换句话说可能会有遗漏事务,非可信队列问问具备很高的性能,在要求不高时经常采用这种队列

lease(租约):
通常指处理者从队列中取出一个条目处理时,向队列宣告的预计处理时间,在超期时队列可以强制收回条目,这防止处理者崩溃后无法返回信息,避免队列陷入长时间的等待

claim(索要):
动词,即处理者从队列中取出一个条目来处理

接下来我们看一看drupal的队列具体实现,首先看的是“队列对象”,生产者调用队列对象放入条目,消费者从队列对象中取出条目,队列对象由队列对象工厂实例化。

 

队列对象工厂:
服务idqueue

类:Drupal\Core\Queue\QueueFactory
该服务是队列对象的总工厂,在内部调用其他工厂来产生队列对象,在理解该服务时,需要先明白“队列服务”和“队列对象”概念的区别:
队列服务是容器中的一个服务,用于产生队列对象,是队列对象的工厂,队列对象是以下接口的实例:

          \Drupal\Core\Queue\QueueInterface

每一个队列对象都对应着一个队列名,从该接口可见如何使用队列对象。

站点配置文件中有几个配置项用于指定队列服务,在默认情况下她们是不存在的:
'queue_reliable_service_' . $name
用于保存可信队列,键名中$name为队列名,键值为可信队列的服务名,即容器ID,字符串值,非数组

'queue_service_' . $name
同前一样,不过是用于保存非可信队列的服务名

queue_default
指定默认队列服务名,当以上设置都不存在时,回退到该项设置的服务,如果该项也没有设置,那么回退到系统默认队列服务“queue.database

该工厂用于返回队列对象,接收站点配置对象,仅有一个方法:

public function get($name, $reliable = FALSE)
参数$name为队列名, $reliable指示是否须要可信队列。在内部先找到队列服务,然后用队列服务产生队列对象并返回,如果$reliabletrue,那么先在可信队列服务中查找,如无则依次查找非可信队列服务、默认队列服务,都不存在时使用系统默认队列服务



系统默认队列服务及队列对象:
服务idqueue.database
类:Drupal\Core\Queue\QueueDatabaseFactory

该服务非常简单,用于实例化队列对象,系统默认队列对象如下:
\Drupal\Core\Queue\DatabaseQueue
该队列对象以数据库做储存,是一个可信队列,实现了可信队列接口:
  \Drupal\Core\Queue\ReliableQueueInterface(该接口继承自队列接口)
同时实现了垃圾收集接口:

  \Drupal\Core\Queue\QueueGarbageCollectionInterface

这可以实现垃圾清理功能。对该队列对象主要知识点说明如下:
用于储存队列数据的数据库表为“
queue”,有5个字段,其含义如下:

  •     item_id:队列中的条目ID,主键,无符号整数
  •     name:队列名,同一队列中各条目的队列名相同,最大255长的字符
  •     data:队列中保存的数据,可是任意数据,被序列化后储存,该字段为二进制格式
  •     created:队列中条目被创建时的时间戳
  •     expire:队列中条目被执行时的超期时间(时间戳,不是租约的秒),没被执行时默认为0
     

需要注意expire如果为0,意为队列中的该条目正等待消费者处理,如果为非零,此时是一个时间戳,隐含的表示条目当前状态为正在处理,时间戳指定了超时时间点,如果超时则认为处理她的消费者发生了异常,此时可将条目让给其他消费者

队列对象各方法说明如下:
public function createItem($data)
向队列中添加一个条目,参数为任意需要传递给消费者(处理者)的数据,数组、对象、标量均可,被序列化保存到数据库,成功时返回条目id,失败时返回FALSE或抛出异常

public function numberOfItems()
计算队列中条目的数量,返回一个整数,出错时抛出异常,注意如果系统非常繁忙,或消费者处理的很快,那么该数量变化非常快,该方法返回值仅返回调用时那一刻的数值,可能返回后就变了

public function claimItem($lease_time = 30)
从队列中索要(claim)一个条目进行处理,传递一个以秒为单位的租约时间,这里的“索要”是什么意思呢?即返回该条目相关的数据,并在队列中标记超期时间,并不是从队列中返回相关数据后就删除条目,删除操作应该由消费者进行,该方法返回一个数据对象,有3个公有属性:data, created, item_id,分别是传递的数据、条目创建时间(时间戳)、条目id,如果失败将返回false,或抛出异常,该方法需要注意以下几个问题:

1、数据反序列化时,需要具备相关环境,即假设数据对象是一个类对象,那么类必须已经加载或能自动加载,否则会抛出异常,由于drupal是自动加载类,开发者不必担心该问题

2、并发问题:web访问是并发的,消费者可以并发的索要条目,但该方法在标记超期时间时,其查询条件可以保证只有一个消费者得到条目,从而避免同一个条目被重复处理,这和锁系统有类似之处(依赖数据库进行排他)

3、索要条目的顺序是按创建时间和条目id排序的,创建时间越早越先处理,如果创建时间相同那么id越小越先处理,换句话说就是“先进先出”,但需注意这里说的“处理”仅代表条目被返回给消费者,由于web的并发性,真实的条目对应的事务处理时间可能会有差异,各条目之间不能假设存在逻辑上的先后依赖关系,即下一个条目处理依赖前一个条目的完成,除非消费条目时具备单一消费者,且该消费者会在内部进行判断,比如先用锁系统保证只有一个请求线程在执行消费者,这样的条件下就是单一消费者,此时可以有先后逻辑依赖关系,默认的drupal在执行计划任务时,并没有在锁系统保护下,因此队列中前后的事务之间不能有依赖关系。

public function releaseItem($item)
释放一个条目,即消费者放弃处理一个条目,其他消费者可以继续处理,条目好像没有被索要过一样,返回布尔值

public function deleteItem($item)
从队列中删除一个条目,消费者在成功处理完条目后应调用此方法进行删除,如不删除会被垃圾收集方法重置为可再处理,导致重复处理;该方法无返回值

public function createQueue()
创建队列,对于数据库队列而言,该方法不需要做什么

public function deleteQueue()
删除队列,会删除队列中所有的条目,而不管其是否已被处理或是否正在处理

public function garbageCollection()
垃圾收集处理,将超期的批处理任务删除(超期时间10天);将超过租约时间的条目重置为可再次索要,以便再次处理,由此可知已经被成功处理的条目消费者应当对其进行删除,否则会导致重复处理

 

队列工作者插件管理器:
服务idplugin.manager.queue_worker

类:Drupal\Core\Queue\QueueWorkerManager
插件目录:Plugin/QueueWorker
插件接口:Drupal\Core\Queue\QueueWorkerInterface
释文类:Drupal\Core\Annotation\QueueWorker
定义修改钩子:queue_info

这其实就是队列消费者插件管理器,消费者在这里被系统定名为“工作者”,因此本篇也将其称为工作者,其管理的每一个插件就是一个队列消费者,插件id作为对应的队列名,如果插件定义中有cron键(其值不为NULL),那么这些工作者会在计划任务中自动执行,详见本系列计划任务主题和以下方法:
    \Drupal\Core\Cron::processQueues

这里注意工作者运行时并不在锁系统保护下,换句话说可能多个请求都在并发执行。
工作者接口中仅定义了以下方法:

    \Drupal\Core\Queue\QueueWorkerInterface::processItem

她接收队列条目携带的数据,而不是队列条目对象,换句话说接收的是创建队列条目时传递的参数

 

自定义工作者:
像其他插件一样定义插件类即可,系统中locale模块定义了一个工作者插件:

Drupal\locale\Plugin\QueueWorker\LocaleTranslation

可参考对比,释文中“cron = {"time" = 30}”所定义的时间以秒为单位,如无定义默认为15秒,该时间将作为工作者在每次计划任务执行时可运行的总时间,也被当做每个队列条目的租用时间,因此需要依据业务场景合理设置,由于计划任务设置了脚本最大执行时间为240秒,所以工作者定义的时间不应该高于此时间,除非再次设置执行总时间,再次说明工作者的调用详见计划任务的以下方法:

\Drupal\Core\Cron::processQueues

在工作者运行期间,如一切正常,系统将处理条目,并删除条目,如发生意外可抛出以下异常:

让条目重新列队异常:
类:\Drupal\Core\Queue\RequeueException

立即释放正在处理的条目,让其可被再次处理,工作者接着索要下一个条目进行处理,大概率可能索要到的是刚刚释放的条目,但也不一定

挂起队列异常:
类:\Drupal\Core\Queue\SuspendQueueException

释放当前条目,并终止队列中随后条目的处理,系统将记录异常日志,跳过当前队列(工作者),开始下一个工作者的执行

任何其他异常:
类:\Exception

抛出的任何其他异常都不能被传递给更上层代码,而是终止当前条目的处理,并记录异常日志,开始下一个条目的执行,当前被终止的条目在队列中仍处于正在执行状态,其将在超期后,被队列垃圾收集机制重置为可再执行状态

 

补充:
1、如果插件定义中有cron键设置,队列工作者会在计划任务中运行,这期间,环境账户被切换为匿名账户,所以不要去操作会话数据,没有cron键设置的队列工作者不会在计划任务中执行

2、通常队列消费者和生产者会成对实现,关于生产者比较简单,可参看“update.processor”服务

3、有两个专用于批处理的队列:

Drupal\Core\Queue\Batch

Drupal\Core\Queue\BatchMemory

将在批处理主题中讲解

4、系统定义了内存型队列:\Drupal\Core\Queue\Memory,这种队列特点是数据存放在内存中,通常需要在一个请求中将任务消化完

 

 

本书共136小节:

评论 (写第一个评论)