Verk is a job processing system backed by Redis. It uses the same job definition of Sidekiq/Resque.
The goal is to be able to isolate the execution of a queue of jobs as much as possible.
Every queue has its own supervision tree:
- A pool of workers;
- A
QueueManager
that interacts with Redis to get jobs and enqueue them back to be retried if necessary; - A
WorkersManager
that will interact with theQueueManager
and the pool to execute jobs.
Verk will hold one connection to Redis per queue plus one dedicated to the ScheduleManager
and one general connection for other use cases like deleting a job from retry set or enqueuing new jobs.
The ScheduleManager
fetches jobs from the retry
set to be enqueued back to the original queue when it's ready to be retried.
It also has one GenEvent manager called EventManager
.
The image below is an overview of Verk's supervision tree running with a queue named default
having 5 workers.
Feature set:
- Retry mechanism with exponential backoff
- Dynamic addition/removal of queues
- Reliable job processing (RPOPLPUSH and Lua scripts to the rescue)
- Error and event tracking
First, add Verk to your mix.exs
dependencies:
def deps do
[{:verk, "~> 0.12"}]
end
and run $ mix deps.get
. Now, list the :verk
application as your
application dependency:
def application do
[applications: [:verk]]
end
Finally add Verk.Supervisor
to your supervision tree:
defmodule Example.App do
use Application
def start(_type, _args) do
import Supervisor.Spec
tree = [supervisor(Verk.Supervisor, [])]
opts = [name: Simple.Sup, strategy: :one_for_one]
Supervisor.start_link(tree, opts)
end
end
Verk was tested using Redis 2.8+
A job is defined by a module and arguments:
defmodule ExampleWorker do
def perform(arg1, arg2) do
arg1 + arg2
end
end
This job can be enqueued using Verk.enqueue/1
:
Verk.enqueue(%Verk.Job{queue: :default, class: "ExampleWorker", args: [1,2], max_retry_count: 5})
This job can also be scheduled using Verk.schedule/2
:
perform_at = Timex.shift(Timex.DateTime.now, seconds: 30)
Verk.schedule(%Verk.Job{queue: :default, class: "ExampleWorker", args: [1,2]}, perform_at)
Example configuration for verk having 2 queues: default
and priority
The queue default
will have a maximum of 25 jobs being processed at a time and priority
just 10.
config :verk, queues: [default: 25, priority: 10],
max_retry_count: 10,
poll_interval: 5000,
start_job_log_level: :info,
done_job_log_level: :info,
fail_job_log_level: :info,
node_id: "1",
redis_url: "redis://127.0.0.1:6379"
The configuration for releases is still a work in progress.
It's possible to dynamically add and remove queues from Verk.
Verk.add_queue(:new, 10) # Adds a queue named `new` with 10 workers
Verk.remove_queue(:new) # Terminate and delete the queue named `new`
Verk's goal is to never have a job that exists only in memory. It uses Redis as the single source of truth to retry and track jobs that were being processed if some crash happened.
Verk will re-enqueue jobs if the application crashed while jobs were running. It will also retry jobs that failed keeping track of the errors that happened.
The jobs that will run on top of Verk should be idempotent as they may run more than once.
One can track when jobs start and finish or fail. This can be useful to build metrics around the jobs. The QueueStats
handler does some kind of metrics using these events: https://github.com/edgurgel/verk/blob/master/lib/verk/queue_stats.ex
Verk has an Event Manager that notifies the following events:
Verk.Events.JobStarted
Verk.Events.JobFinished
Verk.Events.JobFailed
One can define an error tracking handler like this:
defmodule TrackingErrorHandler do
use GenEvent
def init(parent) do
{:ok, parent}
end
def handle_event(%Verk.Events.JobFailed{job: job, failed_at: failed_at, stacktrace: trace}, state) do
MyTrackingExceptionSystem.track(stacktrace: trace, name: job.class)
{ :ok, state }
end
def handle_event(_, state) do
# Ignore other events
{ :ok, state }
end
end
You also need to add the handler to connect with the event manager. You can do this in a number of ways:
- Managing the event handler with a GenServer:
defmodule TrackingErrorHandlerServer do
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, [])
end
def init(manager_name) do
case GenEvent.add_mon_handler(manager_name, TrackingErrorHandler, []) do
:ok -> {:ok, manager_name}
{:error, reason} -> {:stop, reason}
end
end
end
Then adding the GenServer to your supervision tree:
defmodule Example.App do
use Application
def start(_type, _args) do
import Supervisor.Spec
tree = [supervisor(Verk.Supervisor, []),
worker(TrackingErrorHandlerServer, [Verk.EventManager])]
opts = [name: Simple.Sup, strategy: :one_for_one]
Supervisor.start_link(tree, opts)
end
end
- Using Watcher to add the GenEvent to your supervision tree:
defmodule Example.App do
use Application
def start(_type, _args) do
import Supervisor.Spec
tree = [supervisor(Verk.Supervisor, []),
worker(Watcher, [Verk.EventManager, TrackingErrorHandler, []])]
opts = [name: Simple.Sup, strategy: :one_for_one]
Supervisor.start_link(tree, opts)
end
end
More info about GenEvent.add_mon_handler/3
here.
Check Verk Web!
Initial development sponsored by Carnival.io