Observables

Basics

Returning observables

You can return observables in your worker. It works fully transparent - just subscribe to the returned observable in the master code. The returned observable is based on the zen-observable implementation.

// master.js
import { spawn, Thread, Worker } from "threads"

const counter = await spawn(new Worker("./workers/counter"))

counter().subscribe(newCount => console.log(`Counter incremented to:`, newCount))
// workers/counter.js
import { Observable } from "observable-fns"
import { expose } from "threads/worker"

function startCounting() {
  return new Observable(observer => {
    for (let currentCount = 1; currentCount <= 10; currentCount++) {
      observer.next(currentCount)
    }
    observer.complete()
  })
}

expose(startCounting)

Hot observables

Note that in contrast to the default Observable behavior, the observable returned here is “hot”. That means that if you subscribe to it twice, the second subscription will mirror the first one, yielding the same values without subscribing to the data source a second time.

It will not replay values from the past, in case the second subscriber subscribes after the first one has already received values.

Observable subjects

As described earlier, we can always return observables from our workers. While observables usually isolate the code that create observable events from the surrounding code, we do provide a way to trigger updates to the observable “from the outside”.

Using Subject we can create objects that implement the Observable interface, allowing other code to .subscribe() to it, while also exposing .next(value), .complete() and .error(error), so we can trigger those observable updates “from outside”.

In a nutshell:

const observable = new Observable(observer => {
  // We can call `.next()`, `.error()`, `.complete()` only here
  // as they are only exposed on the `observer`
  observer.complete()
})

const subject = new Subject()
subject.complete()
// We are free to call `.next()`, `.error()`, `.complete()` from anywhere now
// Beware: With great power comes great responsibility! Don't write spaghetti code.

Subscribing still works the same:

const subscriptionOne = observable.subscribe(/* ... */)
subscriptionOne.unsubscribe()

const subscriptionTwo = subject.subscribe(/* ... */)
subscriptionTwo.unsubscribe()

To get a plain observable that proxies all values, errors, completion of the subject, but does not expose the .next(), … methods, use Observable.from():

// The returned observable will be read-only
return Observable.from(subject)

Streaming results

We can easily use observable subjects to stream results as they are computed.

// master.js
import { spawn, Thread, Worker } from "threads"

const minmax = await spawn(new Worker("./workers/minmax"))

minmax.values().subscribe(({ min, max }) => {
  console.log(`Min: ${min} | Max: ${max}`)
})

await minmax.add(2)
await minmax.add(3)
await minmax.add(4)
await minmax.add(1)
await minmax.add(5)
await minmax.finish()

await Thread.terminate(minmax)
// minmax.js
import { Observable, Subject } from "threads/observable"
import { expose } from "threads/worker"

let max = -Infinity
let min = Infinity

let subject = new Subject()

const minmax = {
  finish() {
    subject.complete()
    subject = new Subject()
  },
  add(value) {
    max = Math.max(max, value)
    min = Math.min(min, value)
    subject.next({ max, min })
  },
  values() {
    return Observable.from(subject)
  }
}

expose(minmax)

And there we go! A simple worker that keeps track of the minimum and maximum value passed to it, yielding observable updates we can subscribe to. The updated values will be streamed as they happen.