alanqthomas.io
Search…
⌃K

ZIO Striped Locking

My implementation of a striped lock using ZIO

Background

A striped lock is a concurrency mechanism that assigns locks to callers, which are divided into "stripes". Conceptually, it might be helpful to think of this as a map of a "key" type to "locks" e.g. Map[Int, Lock] where Lock is just some synchronization mechanism like a Semaphore.
The type of the key can be anything. The idea is that any effects requesting a lock with equivalent keys will always run independently of each.
While it is guaranteed that equivalent keys will always point to the same lock, it is not guaranteed that non-equivalent keys will not point to the same lock. Put simply, it means that there is a possibility that effects that are not required to run independently of each other could still be forced to do so.

Motivation

The reason you may want something like this (i.e. the reason I made it) is to guarantee atomicity of effects that could possibly work on the same resource. Consider the following example.
You need to run a task that synchronizes an internal resource to an external system/API whenever it gets updated internally. There is no guarantee of an atomic transaction with the external system like there would be if you were just communicating with a relational database that supports transactions (in which case you could just rely on the database to guarantee atomicity).
You would need to ensure only one such task is being executed at a time for any given internal resource. For example:
  1. 1.
    An update to resource with id 42 is made
  2. 2.
    Your system then starts executing a task to sync this internal resource to an external system (Task #1)
  3. 3.
    While that update task is running, another update to resource with id 42 is made, triggering a new task (Task #2). This task should await the completion of the already-running task (Task #1) before starting
  4. 4.
    Task #1 completes
  5. 5.
    Task #2 begins execution and later completes
Technically, this could be trivially achieved by only letting one task for any resource run at a time, e.g. a simple queue of tasks or a semaphore with a single permit. However, that unnecessary blocks parallel runs of tasks that are non-competitive with each other. In the above example, consider that in step 3, an update to a resource with a different id is made – there would be no reason to await the completion of Task #1 in that case, so the execution should proceed as normal.
The tradeoffs between multiple approaches is described well in the documentation for Google's Guava implementation of a striped lock:
The crudest way to [associate a lock with an object] is to associate every key with the same lock, which results in the coarsest synchronization possible. On the other hand, you can associate every distinct key with a different lock, but this requires linear memory consumption and concurrency management for the system of locks itself, as new keys are discovered.
Striped allows the programmer to select a number of locks, which are distributed between keys based on their hash code. This allows the programmer to dynamically select a tradeoff between concurrency and memory consumption, while retaining the key invariant that if key1.equals(key2), then striped.get(key1) == striped.get(key2).
A single lock unnecessarily restricts concurrency, while one-lock-per-key is unnecessarily memory intensive, requires synchronization on the locking mechanism itself, and has essentially unbounded concurrency. The porridge locking mechanism that's just right, in this case, is a striped lock.

Code

Here's what the lock itself looks like:
import zio._
import zio.Semaphore
class StripedLock(private val locksRef: Ref[Vector[Semaphore]]) {
def useLock[R, E, A](value: => Any)(effect: ZIO[R, E, A]): ZIO[R, E, A] = {
for {
locks <- locksRef.get
lock <- ZIO.succeed(locks(value.hashCode().abs % locks.length))
res <- lock.withPermit(effect)
} yield res
}
}
object StripedLock {
def make(size: Int = 1 << 10): UIO[StripedLock] = {
for {
semaphores <- UIO.foreach((0 until size).toVector)(_ => Semaphore.make(1))
ref <- Ref.make(semaphores)
} yield new StripedLock(ref)
}
}
Here's a small example that demonstrates its usage:
import zio._
import zio.console._
import zio.clock._
import zio.duration._
import zio.random._
object Main extends zio.App {
def run(args: List[String]) = main.exitCode
val main = {
val Size = 10
val MinTime = 2
val MaxTime = 5
def getKey(id: Int): Int = id % 2
def longProcess(id: Int) = for {
int <- nextIntBetween(MinTime, MaxTime)
_ <- putStrLn(s"[${"+%02d".format(id)}] [${getKey(id)}] Starting – working for ${int}s ...")
_ <- sleep(int.seconds)
_ <- putStrLn(s"[${"-%02d".format(id)}] [${getKey(id)}] Complete")
} yield ()
for {
lock <- StripedLock.make()
_ <- ZIO.foreachParN(Size)(0 until Size)(id => lock.useLock(getKey(id))(longProcess(id)))
} yield ()
}
}
There are Size "jobs" being executed all in parallel, with their keys being determined by id % 2, i.e. all odd numbers share one lock, and even numbers share another. We should expect that only one odd-numbered id and one even-numbered id job should run at a time. You can change the implementation of getKey to test other key collision scenarios.

Don't sue me

I can't guarantee that this approach is completely correct/efficient/etc. I still consider myself a novice ZIO user so maybe I did something totally wrong (let me know if I did!).
I haven't done load testing so I can't speak to the scalability of this or the "correct" amount of locks to use for any given use case. 1024 "just seemed like the right thing to do" (there is a 1/1024 chance that a lock is shared between keys that didn't need to share one).

Inspiration and References