BoxLang 🚀 A New JVM Dynamic Language Learn More...
Adobe 2018+ or Lucee 5+ ColdBox 6+
A queue connection defines how to connect to a backend service like Redis, RabbitMQ, or even a database. Any given queue connection can have multiple "queues" which are named stacks of queued jobs or messages to be delivered.
A named stack of jobs or messages to be delivered. A queue connection must have at least one queue which is usually "default". A queue connection can have as many queues as desired. This is mostly used later when defining queue workers to scale different queues at different priorities.
A queue provider is how a queue connection connects to a backend service like Redis, RabbitMQ, or a database. It implements the necessary interface to send the jobs and to work the queues. A queue provider can be used multiple times in a single application to define multiple queue connections with different configuration options.
A queue provider must extend the
AbstractQueueProvider
and implement the required abstract methods:
public any function push( required string queue, required
string payload, numeric delay, numeric attempt )
public function function startWorker( required WorkerPool pool )
Additionally, the Queue Provider can use the following hooks to do additional processing or cleanup:
private void function beforeJobRun( required AbstractJob job )
private void function afterJobFailed( required any id,
AbstractJob job, WorkerPool pool )
A job is a CFC that follows the IDispatchableJob
interface (easily done by extending the AbstractJob
component). It defines how to serialize the job using a memento
pattern and deserialize the job from the queue. It also holds the
data needed to execute the job and a handle
method that
is called when working the job from the queue. Job components exist
in the context of your application so you have access to all the
models, services, and helpers you have already written. (Raw string
messages can also be dispatched via cbq. The message will need to be
handled directly by your queue worker.)
component extends="cbq.models.Jobs.AbstractJob" {
function handle() {
sleep( 1000 ); // do some processing work
log.info( "sending email - #this.getBody()#" );
}
}
cbq provides the following providers out of the box:
Future planned providers include:
(See the ROADMAP for other planned protocols.)
Each of the providers takes different configuration when creating a connection. Refer to the specific provider documentation for details.
To install cbq, install it from ForgeBox:
box install cbq
You can configure cbq in your moduleSettings
inside
config/ColdBox.cfc
as follows:
moduleSettings = {
"cbq" : {
// The path the custom config file to register connections and worker pools
"configPath" : "config.cbq",
// Flag if workers should be registered. If your application only pushes to the queues, you can set this to false.
"registerWorkers" : getSystemSetting( "CBQ_REGISTER_WORKERS", true ),
// The interval to poll for changes to the worker pool scaling. Defaults to 0 which turns off the scheduled scaling feature.
"scaleInterval" : 0
}
};
Most of the configuration for cbq happens inside the cbq config file,
located at config/cbq.cfc
by convention.
component {
function configure() {
newConnection( "default" )
.setProvider( "SyncProvider@cbq" );
newWorkerPool( "default", "default" );
}
}
In configure
you define one or more
Connection
s. You must have at least one
Connection
called default
.
New Connection
s are created using the
newConnection
function. It is a builder pattern object.
Only a provider
must be set. The other setters are optional.
newConnection( connectionName )
.setProvider( providerMapping )
.onQueue( name = "default" )
.markAsDefault( /* true / false */ ); .
You can also define worker pools inside configure
to
work on queue
s for a given Connection
that
you defined previously.
New Worker Pool
s are created using the
newWorkerPool
function. It is a builder pattern object.
All of the setters are optional.
newWorkerPool( name, connectionName )
.quantity( numberOfWorkers )
.onQueue( name = "default" )
.backoff( backoffTimeInSeconds )
.timeout( timeoutTimeInSeconds )
.maxAttempts( maxNumberOfAttempts );
The config file follows ColdBox's environment overrides by calling a
method matching the environment name if it is found. Inside that
method you can use the withConnection
and
withWorkerPool
methods to change the properties of
connections defined in configure
and worker pools defined
in work
:
component {
function configure() {
newConnection( "default" )
.setProvider( "DBProvider@cbq" );
newWorkerPool( "default", "default" )
.setTimeout( 5 )
.setMaxAttempts( 5 )
.setQuantity( 3 );
}
function development() {
withConnection( "default" )
.setProvider( "SyncProvider@cbq" );
withWorkerPool( "default" )
.setMaxAttempts( 1 )
.setQuantity( 1 );
}
}
Job
s are CFCs that extend
cbq.models.Jobs.AbstractJob
. You need to define a
handle
method that is ran when the Job
is processed.
// GreetingJob.cfc
component extends="cbq.models.Jobs.AbstractJob" {
function handle() {
// this is ran when the job is processed
log.debug( "Hello world!" );
}
}
To dispatch a job to a queue to be worked, call the
dispatch
method on a Job
instance.
Dispatching a job serializes it and sends it to the configured
connection. It will later be picked up by a worker and processed.
getInstance( "GreetingJob" ).dispatch();
A Job
is sent to the queue with any of its properties
serialized. You can set the properties of a job by calling the
setProperties
method and passing a struct of properties.
getInstance( "GreetingJob" )
.setProperties( { "greeting": "Hello" } )
.dispatch();
Additional Job-level properties can be set before dispatching to override Worker Pool defaults on a per-Job basis.
getInstance( "GreetingJob" )
.setProperties( { "greeting": "Hello" } )
.setDelay( 10 ) // delay processing this job for 10 seconds
.dispatch();
A cbq model exists to make certain job- and dispatch-related actions
easier to perform. You can access it by injecting
cbq@cbq
or simply @cbq
. Here are the methods
available to you:
job
Creates a Job instance.
Name | Type | Required | Default | Description |
---|---|---|---|---|
job | string or Job instance | true | A job instance or mapping
string to a Job instance. Additionally, any string may be
provided here, even if it doesn't exist as a CFC. If so, cbq
will create a NonExecutableJob with the given
mapping. This can only be used if the instance dispatching the
jobs will never work the jobs. | |
properties | struct | false | {}
| A struct of properties for the new Job. |
chain | Job[] | false | []
| An array of Job instances to chain after this one. |
queue | string | false | null
| The queue to run this Job on. Overrides the Job queue and the default queue, if provided. |
backoff | numeric | false | null
| The backoff amount in seconds between Job attempts. Overrides the Job backoff and the default backoff, if provided. |
timeout | numeric | false | null
| The timeout amount in seconds before a Job run is considered timed out. Overrides the Job timeout and the default timeout, if provided. |
maxAttempts | numeric | false | null
| The maxAttempts amount before a Job run is considered failed. Overrides the Job maxAttempts and the default maxAttempts, if provided. |
dispatch
Creates a Job instance and immediately dispatches it.
Name | Type | Required | Default | Description |
---|---|---|---|---|
job | string or Job instance OR array of Job instances | true | A job instance or
mapping string to a Job instance. Additionally, any string may
be provided here, even if it doesn't exist as a CFC. If so, cbq
will create a NonExecutableJob with the given
mapping. This can only be used if the instance dispatching the
jobs will never work the jobs. If an array of Job instances are
passed, this forwards it on to chain and dispatches
the chain. | |
properties | struct | false | {}
| A struct of properties for the new job. |
chain | Job[] | false | []
| An array of Job instances to chain after this one. |
queue | string | false | null
| The queue to run this Job on. Overrides the Job queue and the default queue, if provided. |
backoff | numeric | false | null
| The backoff in seconds amount between Job attempts. Overrides the Job backoff and the default backoff, if provided. |
timeout | numeric | false | null
| The timeout amount in seconds before a Job run is considered timed out. Overrides the Job timeout and the default timeout, if provided. |
maxAttempts | numeric | false | null
| The maxAttempts amount before a Job run is considered failed. Overrides the Job maxAttempts and the default maxAttempts, if provided. |
chain
Creates a Job Chain and returns the first Job in the chain. To
dispatch the chain, you must call dispatch
on the
returned Job.
Name | Type | Required | Default | Description |
---|---|---|---|---|
chain | Job[] | false | []
| An array of Job instances to chain after this one. |
lockForUpdate
flag and add debug logging
(413760f)jobPattern
(552e8ae)before
and after
lifecycle methods (8cf8390)originalId
(041f34b)release
a job inside the handle
method. (451b616)onFailure
method, call it when the job fails
(6b091f7)cbq.job
in case the job is not defined on this server
(14a6dfd)originalId
(041f34b)lockForUpdate
flag and add debug logging
(413760f)jobPattern
(552e8ae)before
and after
lifecycle methods (8cf8390)
$
box install cbq