Byplay

Clojure background job queue on top of PostgreSQL 9.5.

It allows creating background jobs, placing those jobs on multiple queues, and processing them later. Background jobs can be any named Clojure function.

The project is mostly inspired by Que, Resque and Celery.

Clojars Project

Features

Anti-Features

It hasn't been proven yet, but Byplay can experience the problem described in Que docs:

Que's job table undergoes a lot of churn when it is under high load, and like any heavily-written table, is susceptible to bloat and slowness if Postgres isn't able to clean it up. The most common cause of this is long-running transactions, so it's recommended to try to keep all transactions against the database housing Que's job table as short as possible. This is good advice to remember for any high-activity database, but bears emphasizing when using tables that undergo a lot of writes.

This PostgreSQL issue is explained in more detail in the article "Postgres Job Queues & Failure By MVCC".

Installation

Add dependency to your project:

[byplay "0.4.0"]

Require a namespace:

(ns my-app.core
  (:require
    [byplay.core :as b]
    ,,,))

Quickstart

Initialization

On your app start setup Byplay table and the accompanying indexes in the database (it's safe to call this function more than once):

(b/migrate jdbc-conn)

Here jdbc-conn is a "raw" JDBC connection. There are different ways to obtain such instance:

1) Via funcool/clojure.jdbc JDBC wrapper:

(with-open [conn (jdbc.core/connection dbspec)]
  (let [jdbc-conn (jdbc.proto/connection conn)]
    ,,,))

2) Via clojure/java.jdbc JDBC wrapper:

(clojure.java.jdbc/with-db-connection [conn db-spec]
  (let [jdbc-conn (clojure.java.jdbc/db-connection conn)]
    ,,,))

3) Via JDBC datasource (e.g. HikariCP):

(with-open [jdbc-conn (.getConnection datasource)]
    ,,,)

Job Definition

Define a job function:

(defn my-job
  [ctx x y z]
  (do-something-in-job-transaction1 (:jdbc-conn ctx))

  ; or if you use funcool/clojure.jdbc JDBC wrapper:
  (do-something-in-job-transaction2 (:conn ctx))
  ,,,)

Here (:jdbc-conn ctx) is a JDBC connection with the current transaction in progress and (:conn ctx) is the same connection wrapped by funcool/clojure.jdbc connection instance.

Scheduling

Put the job into :default queue:

(b/schedule jdbc-conn #'my-job 1 2 3)

Explicitly specify another queue using schedule-to:

(b/schedule-to jdbc-conn :my-queue #'my-job 1 2 3)

Or specify the queue in the job metadata at :byplay.core/queue key:

(defn ^{::b/queue :my-queue} my-job
  [ctx x y z]
  ,,,)

(b/schedule jdbc-conn #'my-job 1 2 3)

Working

Define an instance of funcool/clojure.jdbc database specification, e.g.:

(def dbspec {:classname   "org.postgresql.Driver"
             :subprotocol "postgresql"
             :subname     "//localhost:5432/myapp"})

Start a background worker with 2 concurrent work threads, each polling the specified queue for a new job every 5 seconds:

(b/start (b/new-worker dbspec {:threads-num      2
                               :queues           [:my-queue]
                               :polling-interval 5000
                               :on-fail          (fn on-fail
                                                   [worker exc {:keys [id job args queue state] :as _job}]
                                                   ,,,)}))

on-fail function will be called if exception is thrown from the job.

Shutdown

You can ask a worker to finish all currently running jobs and stop polling a database with interrupt method. For example this is how a worker can be gracefully stopped in the application shutdown hook:

(.addShutdownHook (Runtime/getRuntime)
                      (Thread. #(do
                                 ; stop the worker before other services (to not break jobs in progress)
                                 (doto worker b/interrupt b/join)

                                 ; stop other services
                                 ,,,)))

Tips

Jobs should be idempotent whenever possible.

Because in rare cases a job may be started more than once. E.g. a worker may die in the middle of a job execution leaving this job in new state.

Thanks to transactional guarantees, if job only updates the database then you don't have to worry about this problem. Just don't forget to use a connection from the job context.

Use database connection pool to speed things up.

See funcool/clojure.jdbc docs. Otherwise Byplay will create a new connection to the database on every poll.

Job signatures are important.

If you schedule a job and than rename its namespace/function than worker won't find the job var and will fail the task. Also be careful with changing job args.

Worker can throw exceptions in background threads.

It is possible that an exception can occur in the worker thread outside of a job function. By default such exceptions silently kill a background thread. So it's a good practice to be ready to explicitly detect them with Thread/setDefaultUncaughtExceptionHandler.

Documentation

More information can be found at the project site:

License

Copyright © 2016 Yuri Govorushchenko.

Released under an MIT license.