Introducing... Zorb!

NPM badge

CI badge

Coverage badge

Zorb is the natural successor to Admiral. It’s a FIFO queue that also supports repeated jobs (called “tasks”) that run at intervals:

  • Write “actions” that complete a small piece of functionality.
  • Combine actions together in order as a “recipe”, defining what payload to expect.
  • Queue one-time jobs, passing a payload to the recipe and executing all the actions.
  • Queue infinitely repeating tasks, again passing a payload to the recipe and executing all the actions, at specific intervals.
  • Optionally check the status of jobs with a management interface.
  • ES6 async/await by default, with Node >=7.9.0 support.

For alternatives, check out Kue - a FIFO-only queue - which features a management interface as standard & backed by Redis. Or Admiral - distributed repeating job-scheduling library - which doesn’t have a management interface but is also backed by Redis.

Writing actions

const actions = zorb.createActionsManager();
actions.register('url-parse', {
	payload: {
		src: { required: true, type: 'string' },
		dest: { required: true, type: 'string' }
	},
	async action({ get, set }, { src, dest }) {
		const parsed = url.parse(get(src), true);
		set(dest, parsed);
	}
});

Actions are the building blocks of your queue. An ActionsManager allows you to register as many actions as you like. You can create as many ActionManagers as you require, but only one ActionManager can be registered to a queue.

Your (optionally async) action will be passed the context and payload objects as arguments, which you are encouraged to destructure & use. Whilst payload simply contains the other properties in the object used to call your action (in the example above, src & dest), the context object includes useful functions relevant to the recipe, including:

  • get(key[, default]) returns the data located at key, or the raw value of key if the key doesn’t start with $.. If default is present & key points to a falsey value, default will be returned, otherwise an error will be thrown.
  • set(key, value) will modify the job’s data object, so data can be passed between actions. Remember to start key with $. otherwise an error will be thrown.
  • log({ ... }) offers a function to log messages for your actions.

Writing Recipes

const recipes = zorb.createRecipeManager();
recipes.register('publish-youtube-video-to-facebook', {  
	name: 'YouTube video to Facebook page',  
	description: 'Cross-publish a specific YouTube video to a Facebook page',  
	payload: {
		video_url: { required: true, type: 'string' },
		page_id: { required: true, type: 'string' }
	},  
	steps: [    
		{      
			action: 'url-parse',      
			src: '$.payload.video_url',      
			dest: '$.url'    
		},    
		{      
			action: 'zorb/if',      
			condition: {        
				type: 'contains',        
				key: '$.url.hostname',       
				 value: 'youtube.com'
			},
			then: {
				action: 'zorb/assert',
				type: 'equals',
				key: '$.url.path',
				value: '/watch'
			}
		},
		{
			action: 'extract-embed',
			src: '$.payload.video_url',
			dest: '$.embed'
		},
		{
			action: 'zorb/assert',
			type: 'notEmpty',
			key: '$.embed.title'
		},
		{
			action: 'publish-to-facebook',
			page: '$.payload.page_url',
			message: [ 'New video: %s', '$.embed.title' ],
			attachment: '$.payload.video_url'
		}
	]
});
const recipes = zorb.createRecipeManager();
recipes.register('publish-youtube-video-to-facebook', {  
	name: 'YouTube video to Facebook page',  
	description: 'Cross-publish a specific YouTube video to a Facebook page',  
	payload: {
		video_url: { required: true, type: 'string' },
		page_id: { required: true, type: 'string' }
	},  
	steps: {
		parseUrl: {
			use: 'url-parse',      
			src: '$.payload.video_url',      
			dest: '$.url'    
		},
		confirmYouTube: {
			use: 'zorb/if',      
			condition: {        
				type: 'contains',        
				key: '$.url.hostname',       
				value: 'youtube.com'
			},
			then: {
				use: 'zorb/assert',
				key: '$.url.path',
				value: '/watch'
			}
		},
		extractEmbed: {
			use: 'extract-embed',
			src: '$.payload.video_url',
			dest: '$.embed'
		},
		checkEmbedTitle: {
			use: 'zorb/assert',
			type: 'notEmpty',
			key: '$.embed.title'
		},
		publishToFacebook: {
			use: 'publish-to-facebook',
			page: '$.payload.page_url',
			message: [ 'New video: %s', '$.embed.title' ],
			attachment: '$.payload.video_url'
		}
	]
});

Recipes connect the dots of your queue. Like the ActionsManager, a RecipeManager allows you to register as many recipes as you like. And again, you can create as many RecipeManagers as you require but only one RecipeManager can be registered to a queue.

Creating the queue

const redis = require('redis').createClient({
	url: 'redis://user:password@redis.local:2468',
});

const queue = zorb.createQueue({ actions, recipes, redis });

The QueueManager lets you create new jobs & tasks, by specifying the recipe you wish to execute & the payload for that recipe. The payload will be validated against the descriptor that the recipe provided.

When writing recipes, especially when using multiple ActionManagers, be wary about which ActionManager will be present at the point you create the queue object. When you create the queue via createQueue the recipes will be validated using actions, and the queue will fail to instantiate.

Adding jobs & tasks

const job = await queue.createJob({
	recipe: 'publish-youtube-video-to-facebook',
	payload: {
		video_url: 'https://www.youtube.com/watch?v=Sd2v5npUcDI',
		page_id: '1010111011011101',
	},
});

const task = await queue.createTask({
	recipe: 'publish-youtube-channel-to-facebook',
	payload: {
		channel_url: 'https://www.youtube.com/theverge',
		page_id: '1010111011011101'
	},
});

Processing the queue

Now that you’ve queued some jobs & tasks, you’ll probably want to have a separate container/process to run through the queue as new items are added. For that, a processQueue function exists:

const zorb = require('zorb');

const queue = zorb.createQueue({ actions, recipes, redis });

zorb.processQueue(queue);

Unlike libraries like Kue, each container/process of Zorb will execute every type of job (since the queue has knowledge of the recipes), so if you want concurrency with your queue then configure your autoscaling rules to have two or more containers/processes running or use the cluster module to execute processQueue multiple times.

Listening to Events

In order to get some insight into what’s happening with the queue, events are emitted at various stages:

  • job-queued & task-queued
  • job-active & task-active
  • job-completed & task-completed
  • job-errored & task-errored

Jobs & tasks move through these states, although keep in mind all tasks will never keep a completed or errored state & will return to a queued state, although the relevant events will be emitted.

Pre-defined actions

Zorb includes some default actions to help you structure your recipes. These are not required for recipes - you know what’s best for your usage - they exist to avoid reinventing the wheel!

All pre-defined actions are prefixed with zorb/. The top-level namespace will always belong to you!

zorb/log

{
	"use": "zorb/log",
	"level": "DEBUG",
	"message": [ "Hello, %s!", "world", "$.payload.user_id" ]
}

A simple way to write logs mid-recipe. You can also write logs inside your own actions, which we’ll get into later.

zorb/if

{
	"use": "zorb/if",
	"condition": {
		"key": "$.url.hostname",
		"type": "equals", // default
		"value": "youtube.com"
	},
	"then": {
		"use": "zorb/log",
		"message": "Hurrah, it’s a YouTube video",
		"args": [ "$.url.query.v" ]
	}
}

A simple if-statement for recipes: execute then if condition is true.

zorb/assert

{
	"use": "zorb/assert",
	"type": "equals",
	"key": "$.post.type",
	"value": "video"
}

Just like the built-in Node module of the same name, assert throws an error and stops the recipe if the assertion fails. Fun fact: assert uses the same conditions as if underneath, and throws an error if the result is false!

zorb/forEach

{
	"use": "zorb/forEach",
	"src": "$.feed",
	"iterate": {
		"type": "zorb/log",
		"message": "forEach",
		"args": [ "$.payload.profile_url", "$i", "$v.id" ]
	}
}

Loop through src, executing iterate. Along with your recipe’s data at $., iterations have $i & $v, the index and the value respectively.

zorb/createJob

{
	"action": "zorb/createJob",
	"recipe": "check-for-new-entries",
	"payload": {
		"video_url": "$.payload.video_url",
		"channel_id": "$.profile.id"
	}
}

A function to add new jobs to the queue. It passes the arguments as queue.createJob.

Processing the queue under Cluster

The example below shows how you may use Cluster to spread the queue processing load across CPUs.

const cluster = require('cluster');
const zorb = require('zorb');
const cpus = require('os').cpus().length;

const queue = zorb.createQueue({ actions, recipes, redis });

if (cluster.isMaster) {
	for (var i = 0; i < cpus; i++) {
		cluster.fork();
	}
} else {
	zorb.processQueue(queue);
}

Check out Cluster’s documentation for more examples & options, but the basic concept is there. This will create a worker for each CPU core on your instance, so you can handle N jobs/tasks concurrently.

This project is under MIT License.