NPM badge
CI badge
Coverage badge
Zorb is the natural successor to Admiral. It’s a FIFO queue that also supports repeated jobs (called “tasks”) that run at intervals:
- Write “actions” that complete a small piece of functionality.
- Combine actions together in order as a “recipe”, defining what payload to expect.
- Queue one-time jobs, passing a payload to the recipe and executing all the actions.
- Queue infinitely repeating tasks, again passing a payload to the recipe and executing all the actions, at specific intervals.
- Optionally check the status of jobs with a management interface.
- ES6 async/await by default, with Node >=7.9.0 support.
For alternatives, check out Kue - a FIFO-only queue - which features a management interface as standard & backed by Redis. Or Admiral - distributed repeating job-scheduling library - which doesn’t have a management interface but is also backed by Redis.
- Writing actions
- Writing Recipes
- Creating the queue
- Adding jobs & tasks
- Processing the queue
- Listening to Events
- Pre-defined actions
- zorb/log
- zorb/if
- zorb/assert
- zorb/forEach
- zorb/createJob
- Processing the queue under Cluster
Writing actions
const actions = zorb.createActionsManager();
actions.register('url-parse', {
payload: {
src: { required: true, type: 'string' },
dest: { required: true, type: 'string' }
},
async action({ get, set }, { src, dest }) {
const parsed = url.parse(get(src), true);
set(dest, parsed);
}
});
Actions are the building blocks of your queue. An ActionsManager allows you to register as many actions as you like. You can create as many ActionManagers as you require, but only one ActionManager can be registered to a queue.
Your (optionally async
) action
will be passed the context
and payload
objects as arguments, which you are encouraged to destructure & use. Whilst payload
simply contains the other properties in the object used to call your action (in the example above, src
& dest
), the context
object includes useful functions relevant to the recipe, including:
get(key[, default])
returns the data located atkey
, or the raw value ofkey
if the key doesn’t start with$.
. Ifdefault
is present &key
points to a falsey value,default
will be returned, otherwise an error will be thrown.set(key, value)
will modify the job’s data object, so data can be passed between actions. Remember to startkey
with$.
otherwise an error will be thrown.log({ ... })
offers a function to log messages for your actions.
Writing Recipes
const recipes = zorb.createRecipeManager();
recipes.register('publish-youtube-video-to-facebook', {
name: 'YouTube video to Facebook page',
description: 'Cross-publish a specific YouTube video to a Facebook page',
payload: {
video_url: { required: true, type: 'string' },
page_id: { required: true, type: 'string' }
},
steps: [
{
action: 'url-parse',
src: '$.payload.video_url',
dest: '$.url'
},
{
action: 'zorb/if',
condition: {
type: 'contains',
key: '$.url.hostname',
value: 'youtube.com'
},
then: {
action: 'zorb/assert',
type: 'equals',
key: '$.url.path',
value: '/watch'
}
},
{
action: 'extract-embed',
src: '$.payload.video_url',
dest: '$.embed'
},
{
action: 'zorb/assert',
type: 'notEmpty',
key: '$.embed.title'
},
{
action: 'publish-to-facebook',
page: '$.payload.page_url',
message: [ 'New video: %s', '$.embed.title' ],
attachment: '$.payload.video_url'
}
]
});
const recipes = zorb.createRecipeManager();
recipes.register('publish-youtube-video-to-facebook', {
name: 'YouTube video to Facebook page',
description: 'Cross-publish a specific YouTube video to a Facebook page',
payload: {
video_url: { required: true, type: 'string' },
page_id: { required: true, type: 'string' }
},
steps: {
parseUrl: {
use: 'url-parse',
src: '$.payload.video_url',
dest: '$.url'
},
confirmYouTube: {
use: 'zorb/if',
condition: {
type: 'contains',
key: '$.url.hostname',
value: 'youtube.com'
},
then: {
use: 'zorb/assert',
key: '$.url.path',
value: '/watch'
}
},
extractEmbed: {
use: 'extract-embed',
src: '$.payload.video_url',
dest: '$.embed'
},
checkEmbedTitle: {
use: 'zorb/assert',
type: 'notEmpty',
key: '$.embed.title'
},
publishToFacebook: {
use: 'publish-to-facebook',
page: '$.payload.page_url',
message: [ 'New video: %s', '$.embed.title' ],
attachment: '$.payload.video_url'
}
]
});
Recipes connect the dots of your queue. Like the ActionsManager, a RecipeManager allows you to register as many recipes as you like. And again, you can create as many RecipeManagers as you require but only one RecipeManager can be registered to a queue.
Creating the queue
const redis = require('redis').createClient({
url: 'redis://user:password@redis.local:2468',
});
const queue = zorb.createQueue({ actions, recipes, redis });
The QueueManager lets you create new jobs & tasks, by specifying the recipe
you wish to execute & the payload
for that recipe. The payload
will be validated against the descriptor that the recipe provided.
When writing recipes, especially when using multiple ActionManagers, be wary about which ActionManager will be present at the point you create the queue
object. When you create the queue
via createQueue
the recipes
will be validated using actions
, and the queue will fail to instantiate.
Adding jobs & tasks
const job = await queue.createJob({
recipe: 'publish-youtube-video-to-facebook',
payload: {
video_url: 'https://www.youtube.com/watch?v=Sd2v5npUcDI',
page_id: '1010111011011101',
},
});
const task = await queue.createTask({
recipe: 'publish-youtube-channel-to-facebook',
payload: {
channel_url: 'https://www.youtube.com/theverge',
page_id: '1010111011011101'
},
});
Processing the queue
Now that you’ve queued some jobs & tasks, you’ll probably want to have a separate container/process to run through the queue as new items are added. For that, a processQueue
function exists:
const zorb = require('zorb');
const queue = zorb.createQueue({ actions, recipes, redis });
zorb.processQueue(queue);
Unlike libraries like Kue, each container/process of Zorb will execute every type of job (since the queue has knowledge of the recipes), so if you want concurrency with your queue then configure your autoscaling rules to have two or more containers/processes running or use the cluster module to execute processQueue
multiple times.
Listening to Events
In order to get some insight into what’s happening with the queue, events are emitted at various stages:
job-queued
&task-queued
job-active
&task-active
job-completed
&task-completed
job-errored
&task-errored
Jobs & tasks move through these states, although keep in mind all tasks will never keep a completed
or errored
state & will return to a queued
state, although the relevant events will be emitted.
Pre-defined actions
Zorb includes some default actions to help you structure your recipes. These are not required for recipes - you know what’s best for your usage - they exist to avoid reinventing the wheel!
All pre-defined actions are prefixed with zorb/
. The top-level namespace will always belong to you!
zorb/log
{
"use": "zorb/log",
"level": "DEBUG",
"message": [ "Hello, %s!", "world", "$.payload.user_id" ]
}
A simple way to write logs mid-recipe. You can also write logs inside your own actions, which we’ll get into later.
zorb/if
{
"use": "zorb/if",
"condition": {
"key": "$.url.hostname",
"type": "equals", // default
"value": "youtube.com"
},
"then": {
"use": "zorb/log",
"message": "Hurrah, it’s a YouTube video",
"args": [ "$.url.query.v" ]
}
}
A simple if-statement for recipes: execute then
if condition
is true.
zorb/assert
{
"use": "zorb/assert",
"type": "equals",
"key": "$.post.type",
"value": "video"
}
Just like the built-in Node module of the same name, assert
throws an error and stops the recipe if the assertion fails. Fun fact: assert
uses the same conditions as if
underneath, and throws an error if the result is false
!
zorb/forEach
{
"use": "zorb/forEach",
"src": "$.feed",
"iterate": {
"type": "zorb/log",
"message": "forEach",
"args": [ "$.payload.profile_url", "$i", "$v.id" ]
}
}
Loop through src
, executing iterate
. Along with your recipe’s data at $.
, iterations have $i
& $v
, the index and the value respectively.
zorb/createJob
{
"action": "zorb/createJob",
"recipe": "check-for-new-entries",
"payload": {
"video_url": "$.payload.video_url",
"channel_id": "$.profile.id"
}
}
A function to add new jobs to the queue. It passes the arguments as queue.createJob
.
Processing the queue under Cluster
The example below shows how you may use Cluster to spread the queue processing load across CPUs.
const cluster = require('cluster');
const zorb = require('zorb');
const cpus = require('os').cpus().length;
const queue = zorb.createQueue({ actions, recipes, redis });
if (cluster.isMaster) {
for (var i = 0; i < cpus; i++) {
cluster.fork();
}
} else {
zorb.processQueue(queue);
}
Check out Cluster’s documentation for more examples & options, but the basic concept is there. This will create a worker for each CPU core on your instance, so you can handle N
jobs/tasks concurrently.
This project is under MIT License.