BoxLang 🚀 A New JVM Dynamic Language Learn More...

cbq

v3.0.12 Modules

cbq

A protocol-based queueing system for ColdBox

Requirements

Adobe 2018+ or Lucee 5+ ColdBox 6+

Definitions

Queue Connection

A queue connection defines how to connect to a backend service like Redis, RabbitMQ, or even a database. Any given queue connection can have multiple "queues" which are named stacks of queued jobs or messages to be delivered.

Queue

A named stack of jobs or messages to be delivered. A queue connection must have at least one queue which is usually "default". A queue connection can have as many queues as desired. This is mostly used later when defining queue workers to scale different queues at different priorities.

Queue Provider

A queue provider is how a queue connection connects to a backend service like Redis, RabbitMQ, or a database. It implements the necessary interface to send the jobs and to work the queues. A queue provider can be used multiple times in a single application to define multiple queue connections with different configuration options.

A queue provider must extend the AbstractQueueProvider and implement the required abstract methods:

  • public any function push( required string queue, required string payload, numeric delay, numeric attempt )
  • public function function startWorker( required WorkerPool pool )

Additionally, the Queue Provider can use the following hooks to do additional processing or cleanup:

  • private void function beforeJobRun( required AbstractJob job )
  • private void function afterJobFailed( required any id, AbstractJob job, WorkerPool pool )

Job

A job is a CFC that follows the IDispatchableJob interface (easily done by extending the AbstractJob component). It defines how to serialize the job using a memento pattern and deserialize the job from the queue. It also holds the data needed to execute the job and a handle method that is called when working the job from the queue. Job components exist in the context of your application so you have access to all the models, services, and helpers you have already written. (Raw string messages can also be dispatched via cbq. The message will need to be handled directly by your queue worker.)

component extends="cbq.models.Jobs.AbstractJob" {

    function handle() {
        sleep( 1000 ); // do some processing work
        log.info( "sending email - #this.getBody()#" );
    }

}

Providers

cbq provides the following providers out of the box:

  • SyncProvider
  • ColdBoxAsyncProvider
  • DatabaseProvider

Future planned providers include:

  • Mock (for testing)
  • RabbitMQ
  • Redis
  • Couchbase

(See the ROADMAP for other planned protocols.)

Each of the providers takes different configuration when creating a connection. Refer to the specific provider documentation for details.

Installation and Setup

To install cbq, install it from ForgeBox:

box install cbq

You can configure cbq in your moduleSettings inside config/ColdBox.cfc as follows:

moduleSettings = {
    "cbq" : {
        // The path the custom config file to register connections and worker pools
        "configPath" : "config.cbq",
        // Flag if workers should be registered.  If your application only pushes to the queues, you can set this to false.
        "registerWorkers" : getSystemSetting( "CBQ_REGISTER_WORKERS", true ),
        // The interval to poll for changes to the worker pool scaling.  Defaults to 0 which turns off the scheduled scaling feature.
        "scaleInterval" : 0
    }
};

Most of the configuration for cbq happens inside the cbq config file, located at config/cbq.cfc by convention.

component {

    function configure() {
        newConnection( "default" )
            .setProvider( "SyncProvider@cbq" );

		newWorkerPool( "default", "default" );
    }

}

In configure you define one or more Connections. You must have at least one Connection called default.

New Connections are created using the newConnection function. It is a builder pattern object. Only a provider must be set. The other setters are optional.

newConnection( connectionName )
    .setProvider( providerMapping )
    .onQueue( name = "default" )
    .markAsDefault( /* true / false */ );    .

You can also define worker pools inside configure to work on queues for a given Connection that you defined previously.

New Worker Pools are created using the newWorkerPool function. It is a builder pattern object. All of the setters are optional.

newWorkerPool( name, connectionName )
    .quantity( numberOfWorkers )
    .onQueue( name = "default" )
    .backoff( backoffTimeInSeconds )
    .timeout( timeoutTimeInSeconds )
    .maxAttempts( maxNumberOfAttempts );

The config file follows ColdBox's environment overrides by calling a method matching the environment name if it is found. Inside that method you can use the withConnection and withWorkerPool methods to change the properties of connections defined in configure and worker pools defined in work:

component {

	function configure() {
		newConnection( "default" )
			.setProvider( "DBProvider@cbq" );

		newWorkerPool( "default", "default" )
			.setTimeout( 5 )
			.setMaxAttempts( 5 )
			.setQuantity( 3 );
	}

	function development() {
		withConnection( "default" )
			.setProvider( "SyncProvider@cbq" );

		withWorkerPool( "default" )
			.setMaxAttempts( 1 )
			.setQuantity( 1 );
	}

}

Usage

Job Components

Jobs are CFCs that extend cbq.models.Jobs.AbstractJob. You need to define a handle method that is ran when the Job is processed.

// GreetingJob.cfc
component extends="cbq.models.Jobs.AbstractJob" {

    function handle() {
        // this is ran when the job is processed
        log.debug( "Hello world!" );
    }

}

To dispatch a job to a queue to be worked, call the dispatch method on a Job instance. Dispatching a job serializes it and sends it to the configured connection. It will later be picked up by a worker and processed.

getInstance( "GreetingJob" ).dispatch();

A Job is sent to the queue with any of its properties serialized. You can set the properties of a job by calling the setProperties method and passing a struct of properties.

getInstance( "GreetingJob" )
    .setProperties( { "greeting": "Hello" } )
    .dispatch();

Additional Job-level properties can be set before dispatching to override Worker Pool defaults on a per-Job basis.

getInstance( "GreetingJob" )
    .setProperties( { "greeting": "Hello" } )
    .setDelay( 10 ) // delay processing this job for 10 seconds
    .dispatch();

CBQ Model

A cbq model exists to make certain job- and dispatch-related actions easier to perform. You can access it by injecting cbq@cbq or simply @cbq. Here are the methods available to you:

job

Creates a Job instance.

NameTypeRequiredDefaultDescription
jobstring or Job instancetrue A job instance or mapping string to a Job instance. Additionally, any string may be provided here, even if it doesn't exist as a CFC. If so, cbq will create a NonExecutableJob with the given mapping. This can only be used if the instance dispatching the jobs will never work the jobs.
propertiesstructfalse{}A struct of properties for the new Job.
chainJob[]false[]An array of Job instances to chain after this one.
queuestringfalsenullThe queue to run this Job on. Overrides the Job queue and the default queue, if provided.
backoffnumericfalsenullThe backoff amount in seconds between Job attempts. Overrides the Job backoff and the default backoff, if provided.
timeoutnumericfalsenullThe timeout amount in seconds before a Job run is considered timed out. Overrides the Job timeout and the default timeout, if provided.
maxAttemptsnumericfalsenullThe maxAttempts amount before a Job run is considered failed. Overrides the Job maxAttempts and the default maxAttempts, if provided.

dispatch

Creates a Job instance and immediately dispatches it.

NameTypeRequiredDefaultDescription
jobstring or Job instance OR array of Job instancestrue A job instance or mapping string to a Job instance. Additionally, any string may be provided here, even if it doesn't exist as a CFC. If so, cbq will create a NonExecutableJob with the given mapping. This can only be used if the instance dispatching the jobs will never work the jobs. If an array of Job instances are passed, this forwards it on to chain and dispatches the chain.
propertiesstructfalse{}A struct of properties for the new job.
chainJob[]false[]An array of Job instances to chain after this one.
queuestringfalsenullThe queue to run this Job on. Overrides the Job queue and the default queue, if provided.
backoffnumericfalsenullThe backoff in seconds amount between Job attempts. Overrides the Job backoff and the default backoff, if provided.
timeoutnumericfalsenullThe timeout amount in seconds before a Job run is considered timed out. Overrides the Job timeout and the default timeout, if provided.
maxAttemptsnumericfalsenullThe maxAttempts amount before a Job run is considered failed. Overrides the Job maxAttempts and the default maxAttempts, if provided.

chain

Creates a Job Chain and returns the first Job in the chain. To dispatch the chain, you must call dispatch on the returned Job.

NameTypeRequiredDefaultDescription
chainJob[]false[]An array of Job instances to chain after this one.

v3.0.12

03 Sep 2024 — 15:53: 21 UTC

other

  • *: fix: Update syntax for Lucee 6.1 and ACF (a4f4502)

v3.0.11

07 Aug 2024 — 18:40: 35 UTC

other

  • *: Fix missing shutdownTimeout variable (9bb553c)

v3.0.10

07 Aug 2024 — 16:37: 29 UTC

fix

  • Batch: Don't override Batch job queues unless one is specifically requested (6926067)

v3.0.9

25 Jun 2024 — 16:46: 56 UTC

fix

  • SyncProvider: Fix stack overflow when releasing a job too many times (5997087)

v3.0.8

12 Jun 2024 — 17:21: 18 UTC

fix

  • SyncProvider: Add chained jobs to the Sync job memento. (b1af5aa)

v3.0.7

23 May 2024 — 21:34: 49 UTC

fix

  • SyncProvider: Un-nest chains to prevent stack overflows (b89c81a)

v3.0.6

23 May 2024 — 17:07: 59 UTC

fix

  • FailedJobs: Fix variable name (b6e5f40)

v3.0.5

23 May 2024 — 17:03: 25 UTC

fix

  • FailedJobs: Inject logbox into the interceptor (f759d4f)

v3.0.4

23 May 2024 — 16:58: 00 UTC

fix

  • FailedJobs: Use CF_SQL_VARCHAR as the SQL type for originalId (041f34b)

v3.0.3

23 May 2024 — 16:40: 10 UTC

fix

  • FailedJobs: Make originalId able to track all provider ids (ecbade0)

v3.0.2

21 May 2024 — 14:01: 55 UTC

fix

  • FailedJobs: Log errors logging failed jobs (fd4bffa)

v3.0.1

16 May 2024 — 15:48: 35 UTC

fix

  • DBProvider: Fix releasing job timeouts using the wrong value (276a473)

v3.0.0

16 May 2024 — 14:58: 33 UTC

BREAKING

  • FailedJobs: Use a unix timestamp as the failed job log failedDate column type (0f36ad8)
  • DBProvider: Better locking to avoid duplicate runs of the same job (eed4c61)

chore

  • DBProvider: Remove lockForUpdate flag and add debug logging (413760f)

feat

  • WorkerPool: Make shutdown timeout configurable (575651c)

fix

  • DBProvider: Allow picking up of jobs that were previously reserved but not released correctly (67c3659)
  • DBProvider: When claiming a job, the DBProvider should extend the availableDate by the job timeout, not backoff. (bb0b0e2)
  • WorkerPool: Correctly shutdown worker pools (fcaee85)
  • DBProvider: Fix for unwrapping an Optional in a log message (9025919)

other

  • *: feat: Add clean-up tasks for completed or failed jobs, failed job logs, and completed or cancelled batches. (80ee9e9)
  • *: v3.0.0-beta.1 (139302b)

v2.1.0

13 Feb 2024 — 23:42: 12 UTC

feat

  • DBProvider: Add back ability to work on multiple queues (364b9ca)
  • Interceptors: Add ability to restrict interceptor execution with jobPattern (552e8ae)
  • Job: Add support for before and after lifecycle methods (8cf8390)

v2.0.5

09 Nov 2023 — 16:58: 45 UTC

fix

  • DBProvider: Disable forceRun until we figure out why it's losing mappings (4418d4c)

v2.0.4

06 Nov 2023 — 19:33: 46 UTC

other

  • *: fix: reload module mappings in an attempt to work around ColdBox Async losing them (152e282)

$ box install cbq

No collaborators yet.
     
  • {{ getFullDate("2022-09-06T02:44:07Z") }}
  • {{ getFullDate("2024-09-03T15:53:27Z") }}
  • 4,809
  • 6,321