13. Concurrency with workers

Concurrency is hard. Fortunately for us the java.util.concurrent packages bring useful abstractions, data types and execution mechanisms to get concurrency "a little bit better".

Golo doesn’t provide a equivalent to the synchronized keyword of Java. This is on-purpose: when facing concurrency, we advise you to just use whatever is in java.util.concurrent.

That being said we provide a simple abstraction for concurrent executions in the form of workers. They pretty much resemble JavaScript web workers or isolates in Dart, albeit they do not really isolate the workers data space.

13.1. The big picture

A worker is simply a Golo function that can be executed concurrently. You can pass messages to a worker, and they are eventually received and handled by their target worker. In other words, workers react to messages in an asynchronous fashion.

Communications between a worker and some client code happens through ports. A port is simply an object that is responsible for dispatching a message to its worker.

Ports are obtained by spawning a worker function from a worker environment. Internally, a worker environment manages a java.util.concurrent executor, which means that you do not have to deal with thread management.

13.2. Worker environments

Worker environments are defined in the gololang.concurrent.workers.WorkerEnvironment class / module.

You can directly pass an instance of java.util.concurrent.ExecutorService to its constructor, or you may go through its builder object and call either of the following static methods:

  • withCachedThreadPool() uses a cached thread pool,
  • withFixedThreadPool(size) uses a fixed number of threads in a pool,
  • withFixedThreadPool() uses a pool with 1 thread per processor core,
  • withSingleThreadExecutor() uses a single executor thread.

In most scenarios withCachedThreadPool() is a safe choice, but as usual, your mileage varies. If you have many concurrent tasks to perform and they are not IO-bound, then withFixedThreadPool() is probably a better option. You should always measure, and remember that you can always pass a fine-tuned executor to the WorkerEnvironment() constructor.

Worker environments also provide delegate methods to their internal executor. It is important to call shutdown() to close the workers environment and release the threads pool. You can also call the awaitTermination, isShutdown and isTerminated methods whose semantics are exactly those of java.util.concurrent.ExecutorService.

13.3. Spawning a worker and passing messages

Worker functions take a single parameter which is the message to be received. To obtain a port, you need to call the spawn(target) function of a worker environment, as in:

let env = WorkerEnvironment.builder(): withFixedThreadPool()
let port = env: spawn(|message| -> println(">>> " + message))

A port provides a send(message) method:

port: send("hello"): send("world")

Messages are being put in a queue, and eventually dispatched to the function that we spawned.

13.4. A complete and useless example

To better understand how workers can be used, here is a (fairly useless) example:

module SampleWithWorkers

import java.lang.Thread
import java.util.concurrent
import gololang.concurrent.workers.WorkerEnvironment

local function pusher = |queue, message| -> queue: offer(message) (1)

local function generator = |port, message| { (2)
  foreach i in range(0, 100) {
    port: send(message)                      (3)
  }
}

function main = |args| {

  let env = WorkerEnvironment.builder(): withFixedThreadPool()
  let queue = ConcurrentLinkedQueue()

  let pusherPort = env: spawn(^pusher: bindTo(queue))
  let generatorPort = env: spawn(^generator: bindTo(pusherPort))

  let finishPort = env: spawn(|any| -> env: shutdown()) (4)

  foreach i in range(0, 10) {
    generatorPort: send("[" + i + "]")
  }
  Thread.sleep(2000_L)
  finishPort: send("Die!") (5)

  env: awaitTermination(2000)
  println(queue: reduce("", |acc, next| -> acc + " " + next))
}

In this example, we spawn 3 workers:

(2)

the first repeats a message 100 times,

(3)

…forwarding them to another one,

(1)

…that ultimately pushes them to a concurrent queue.

(5)

A message is sent to a final worker,

(4)

…that shuts the workers environment down.

As an aside, the example illustrates that worker functions may take further dependencies as arguments. The pusher function takes a queue target and generator needs a port.

You can satisfy dependencies by pre-binding function arguments, all you need is to make sure that each function passed to spawn only expects a single message as its argument, as in:

  • ^pusher: bindTo(queue), and
  • ^generator: bindTo(pusherPort), and
  • env: spawn(|any| -> env: shutdown()) where the worker function is defined as a closure, and implicitly captures its env dependency from the surrounding context.