[004.2] Sidekiq Pro: Batches and Callbacks

Creating batches of jobs that can be monitored as a group, and having callbacks run when the batch is complete.

Subscribe now

Sidekiq Pro: Batches and Callbacks [05.13.2016]

Batches are Sidekiq's term for a collection of jobs which can be monitored as a group. You can create a set of jobs to execute in parallel and then execute a callback when all the jobs are finished. They're a useful building block for taking advantage of parallelism or building complex workflows. Let's play with them a bit.

Project

Project Setup (off-video)

We're going to build a basic Rails application that has some ActiveRecord models for us to demonstrate this functionality:

rails new sidekiq_batches
cd sidekiq_batches
# We switch to postgres
vim config/database.yml
default: &default
  adapter: postgresql

development:
  <<: *default
  database: sidekiq_batches_development

test:
  <<: *default
  database: sidekiq_batches_test

production:
  <<: *default
  database: sidekiq_batches_production
vim Gemfile
gem 'pg'
rails g model Upload body:text completed:boolean
# We'll make the completed bool false by default
vim db/migrate/*_uploads.rb
      t.boolean :completed, default: false
rails g model Article title:string url:string body:text upload:references
rake db:create db:migrate
rails c
csv = File.read("./wiki_articles.csv")
Upload.create(body: csv)
# We'll add sidekiq and mechanize to our Gemfile
cp ~/ruby/sidekiq_rails_demo/.envrc ./
direnv allow
vim Gemfile
gem 'mechanize'
source ENV["SIDEKIQ_SOURCE_URL"] do
  gem 'sidekiq-pro'
end
# We'll also add sinatra so we can use the web ui
gem 'sinatra'

We enable the sidekiq pro web ui:

vim config/routes.rb
require 'sidekiq/pro/web'

Rails.application.routes.draw do
  mount Sidekiq::Web => '/sidekiq'
end
bundle
# We'll set sidekiq as our activejob adapter
vim config/initializers/active_job.rb
if Rails.env.test?
  ActiveJob::Base.queue_adapter = :inline
else
  ActiveJob::Base.queue_adapter = :sidekiq
end
cp ~/tmp/wiki_articles.csv ./
vim app/models/upload.rb
class Upload < ActiveRecord::Base
  def import!
    UploadWorker.perform_later(self.id)
  end
end
mkdir app/workers
vim app/workers/upload_worker.rb
require 'csv'

class UploadWorker < ActiveJob::Base
  queue_as :default

  def perform(upload_id)
    upload = Upload.find(upload_id)
    batch = Sidekiq::Batch.new
    batch.jobs do
      CSV.parse(upload.body) do |row|
        ArticleWorker.perform_later(upload_id, row)
      end
    end
  end
end
vim app/workers/article_worker.rb
class ArticleWorker < ActiveJob::Base
  queue_as :default

  def perform(upload_id, row)
    title, url = row
    agent = Mechanize.new
    agent.get(url)
    body = agent.page.body
    Article.create(upload_id: upload_id, title: title, body: body, url: url)
  end
end

Project Intro

We've got a basic Rails application in place with Sidekiq Pro setup, and I've created an Upload model in this project with a CSV file in its body:

rails c
u = Upload.first
puts u.body

We have an UploadWorker:

vim app/workers/upload_worker.rb

It will:

  • find an upload
  • create a new batch
  • parse the body of the upload, which is expected to be CSV
  • for each row in the CSV, it will create a new ArticleWorker job inside this batch.

Let's look at ArticleWorker. It takes an upload_id and a row of parsed CSV data, and it will:

  • extract the data out of the row (this bit of code is coupled to the format of the CSV file)
  • create a new agent with Mechanize
  • fetch the URL
  • extract that page's body
  • create an Article record in our database with the title specified in the CSV file, the url specified in the CSV file, and the body of the page that was fetched.

Because we're building a batch of these ArticleWorkers, each URL will be processed as its own Job, and as many of the rows as can succeed will. We'll also be able to see if the entire batch completed. This is a way to get easy massive parallelism from Sidekiq, while still knowing if all of the work in a given batch completed successfully.

Adding jobs to a batch

For what it's worth, you can add jobs to a running batch dynamically. This might be useful if you were building a website spider or something. We don't need it for this, but I wanted to mention it.

Fetching a batch's status

We'll start a batch on our upload:

upload = Upload.last
upload.import! # This calls our UploadWorker

Let's see if the batch completed successfully. First off, we'll check it out in the Web UI. Sidekiq Pro adds a 'Batches' tab up top:

rails s
# Visit http://localhost:3000/sidekiq.

OK, we can see that the batch didn't finish successfully. If we click into it, we can see the failures.

Oops, there's a bad URL in this file. We'll fix that and create a new upload.

vim wiki_articles.csv
# Delete the first line
csv = File.read("./wiki_articles.csv")
u = Upload.create(body: csv)
u.import!
# Now we can check the web ui again...except it probably finished up too
# quickly.  If it's not there, then it was successful, as they're deleted
# immediately on success.

# We can review the status of the failed job though.
# Get the batch id from the web ui
bid = "9zECOxyJ3lPL8A" # Obviously will be different
status = Sidekiq::Batch::Status.new(bid)
status.total # jobs in the batch => 3
status.failures # failed jobs so far => 1
status.pending # jobs which have not succeeded yet => 1
status.created_at # => 2016-04-22 21:15:05 -0700
status.complete? # if all jobs have executed at least once => false
status.join # blocks until the batch is considered complete, note that some jobs might have failed
status.failure_info # an array of failed jobs
status.data # a hash of data about the batch which can easily be converted to JSON for javascript usage

OK, so that's a quick overview of the API available for batches and a look at the Web UI.

Callbacks

We have that completed boolean on our Upload model. We'd like to update that when the batch completes successfully, but leave it false if it fails. To do this, we can make use of Sidekiq's Callbacks. Let's modify the UploadWorker briefly:

require 'csv'

class UploadWorker < ActiveJob::Base
  queue_as :default

  def perform(upload_id)
    upload = Upload.find(upload_id)
    batch = Sidekiq::Batch.new
    # Add this line.  It has to come before defining the batch's jobs.
    # This will run a given class method, called `on_success`, with
    # the provided arguments
    batch.on(:success, UploadWorker, upload_id)
    batch.jobs do
      CSV.parse(upload.body) do |row|
        ArticleWorker.perform_later(upload_id, row)
      end
    end
  end

  def on_success(status, upload_id)
    # Here status is the same as the batch status we saw in the api above
    upload = Upload.find(upload_id)
    upload.update_attributes({ completed: true })
  end
end

Now restart Sidekiq so it picks up the changes.

We'll make a new Upload and import it:

csv = File.read("./wiki_articles.csv")
u = Upload.create(body: csv)
u.import!
u.reload
u.completed?

And we can see that the upload was flagged completed when Sidekiq completed the batch successfully.

Summary

That's it for now. To recap, we learned how to create a batch of jobs that run in parallel and then have Sidekiq Pro run a callback when all jobs are done. I think you'll find this really useful as you process more jobs and more complex job workflows.

Resources

Sample Spreadsheet

KnownBadUrl,https://thisinotavalidurl@nopenopenope
Ruby,https://en.wikipedia.org/wiki/Ruby_(programming_language)
Rust,https://en.wikipedia.org/wiki/Rust_(programming_language)