ActiveRecord find_or_create_by and concurrency - how to handle it?

ActiveRecord provides a bunch of useful methods. One of them is a find_or_create_by which is cool, because it avoids using IF statement. Unfortunately, as in life, everything that is easy to use, isn’t always optimal. Under certain conditions, we may encounter some problems. Sometimes, even though it will work on one thread well, doesn’t mean it will be working correctly on parallel.

Case:

Imagine that we have simple AR model:

class CounterCache < ApplicationRecord; end
class CreateCounterCaches < ActiveRecord::Migration[5.2]
  def change
    create_table :counter_caches do |t|
      t.string :key, null: false
      t.integer :value, default: 0, null: false

      t.timestamps
    end
  end
end

Then we use this model in parallel sidekiq worker:

class CounterCacheWorker
  include Sidekiq::Worker

  def perform(key)
    CounterCache.find_or_create_by!(key: key).touch
  end
end

We have there find_or_create_by method which will create a new record if Postgres doesn’t find record with specific key. Then we simple touch this record, but that isn’t important - we want to focus on find_or_create_by.

Do you see any issues with the above code? Maybe it looks good at first glance, but this won’t work, because our find_or_create_by is not atomic.

We can define an atomic operation as any operation that is uninterruptible. - Robert C. Martin, Clean code

As the name of the method indicates, we have 2 operations to perform - find and create (if record doesn’t exist). Let’s see what is going on under the hood:

# File activerecord/lib/active_record/relation.rb, line 211
def find_or_create_by(attributes, &block)
  find_by(attributes) || create(attributes, &block)
end

As we can see if record exists, find_or_create_by returns SELECT query, if not - it executes second INSERT query. Because of that there is a s chance that our parallel worker will create duplicated records (with the same key). It is called a race conditions - when few threads or instances try to execute queries at one time, they can make SELECT on one moment and then execute INSERT or UPDATE in an incorrect way.

Time ASC Thread 1 Thread 2
  Read data  
    Read data
    Write data
  Write data  

First attempt

I wrote simple script which I’ll use for testing:

10.times { CounterCacheWorker.perform_at(30.seconds.from_now, "books") }

Result:

_config.yml

Using ActiveRecord find_or_create_by we have duplicated records - that was easy to predict. Let’s have fun trying different ways to solve the problem!

Pure PostgreSQL find_or_create_by query

_config.yml

I added new method to our CounterCache model:

class << self
  def select_or_insert_key(key)
    sel = <<-SQL
      INSERT INTO #{table_name} ("key")
      SELECT '#{key}'
      WHERE NOT EXISTS (SELECT 1 FROM "#{table_name}" WHERE ("#{table_name}"."key" = '#{key}'));
    SQL
    ActiveRecord::Base.connection.execute(sql)
  end
end

So, the scenario I want to check is based on the assumption that finding a record and creating a new one in a database will be fast enough to avoid duplication.

Result:

_config.yml

It’s better - we haven’t got any duplication, but we have to remember that this method is still non atomic. Let’s try again, but now I’ll change number of threads running in one moment from 2 to 10:

_config.yml

The conclusion is simple - with more parallel query executions, INSERT WHERE NOT EXISTS doesn’t work.

PostgreSQL lock

What about locking the table?

We can achieve it quite easily:

class << self
  def find_or_create_by_with_lock!(attributes, &block)
    ActiveRecord::Base.transaction do
      ActiveRecord::Base.connection.execute("LOCK #{table_name} IN ACCESS EXCLUSIVE MODE")
      find_or_create_by!(attributes, &block)
    end
  end
end

I wrote find_or_create_by_with_lock! method, which allows me to use AR’s find_or_create_by! in transaction with a lock IN ACCESS EXCLUSIVE MODE. This type of lock is the most strict of all PostgreSQL locks, because when one process accesses the table, PostgreSQL doesn’t allow to read and write for others.

And it works! But it also has a side effect - performance. How big? Let’s measure:

Benchmark.measure do
  1000.times { CounterCache.find_or_create_by!(key: "books") }
end

=> total time: 2.51s

Benchmark.measure do
  1000.times { CounterCache.find_or_create_by_with_lock!(key: "books") }
end

=> total time: 6.12s

The sad thing is that it’s almost 3 times slower! And if we run find_or_create_by_with_lock! parallel even slower, because threads will block each other.

Unique index

We are going for success. The case is that we can set unique index on table column or even group of them. Thanks to that our database prevent duplications and raises an UNIQUE INDEX error.

class AddIndexForCounterCaches < ActiveRecord::Migration[5.2]
  def change
    add_index :counter_caches, :key, unique: true
  end
end

In the result we haven’t got any duplications, but we have 9 errors on Sidekiq:

ActiveRecord::RecordNotUnique: PG::UniqueViolation: ERROR: duplicate key value violates unique constraint "index_counter_caches_on_key" DETAIL: Key (key)=(books) already exists. : INSERT INTO "counter...

Quick fix:

def perform(key, logger = Rails.logger)
  CounterCache.find_or_create_by!(key: key).touch
rescue ActiveRecord::RecordNotUnique => exception
  logger.warn("Skipping an existing product: #{exception.message}")
  # Here we can add logic to perform whatever we need
end

We, Ruby developers, know that raising exceptions is expensive, but we need to remember that ActiveRecord can be even more (check it), so I think using unique index is a good way to solve our find_or_create_by problem.

I want more fun!

Will we be able to avoid raising exceptions? Let’s find out!

We know that AR isn’t atomic, locks aren’t efficient and we don’t want to raise an exception. For me we should focus on database capabilities, is it possible at all?

_config.yml

So, let’s check what we can do with PG INSERT. I see some interesting options there:

INSERT INTO table_name [ AS alias ] [ ( column_name [, ...] ) ]
    [ OVERRIDING { SYSTEM | USER} VALUE ]
    { DEFAULT VALUES | VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query }
    [ ON CONFLICT [ conflict_target ] conflict_action ]
    [ RETURNING * | output_expression [ [ AS ] output_name ] [, ...] ]

where conflict_target can be one of:

    ( { index_column_name | ( index_expression ) } [ COLLATE collation ] [ opclass ] [, ...] ) [ WHERE index_predicate ]
    ON CONSTRAINT constraint_name

and conflict_action is one of:

    DO NOTHING
    DO UPDATE SET { column_name = { expression | DEFAULT } |
                    ( column_name [, ...] ) = [ ROW ] ( { expression | DEFAULT } [, ...] ) |
                    ( column_name [, ...] ) = ( sub-SELECT )
                  } [, ...]
              [ WHERE condition ]

Maybe we can use ON CONFLICT option. After quick research I found library which we can use for tests.

First of all we must indicate in which column a conflict may occur. We can do that by adding to model:

upsert_keys :key

In next step we have to update our worker:

class CounterCacheWorker
  include Sidekiq::Worker

  def perform(key)
    CounterCache.upsert!(key: key)
  end
end

And run it.. Bang, it works! The queries looks like these:

_config.yml

Postgres tried to create a new record but because there was a record with key “books” - instead of raising error, it updates its fields (in our case any field needs update). I think that upserts can be useful in situations where we care about the latest record value, which isn’t calculated on its previous versions (for example increment still won’t work, but it’s a topic for another blog post).

Written on March 18, 2019