Joins in Scio

Scio is Spotify’s open-source Scala API for Apache Beam and Google Cloud Dataflow. It’s used by data engineers at Spotify to process many petabytes of data each day. Let’s look at the different joins it supports and how and when to use each.

Scio has integration for various data sources and formats: Avro, Parquet, and Protobuf, BigQuery, Bigtable, Cassandra, Elasticsearch, RDBMSs that support JDBC, Spanner, TensorFlow’s TFRecord. Whatever the origin, the data all ends up in what is referred to as an SCollection. It is a lightweight Scala wrapper around Apache Beam’s PCollection, a distributed data set. Although it says ‘collection’ on the box, it really is a DAG. It does not hurt to think of an SCollection as a List when developing pipelines, but it’s important to remember it’s really a computation graph. This means you cannot peak inside an SCollection easily, although there is debug(). Likewise, there is no first, last, or take(n: Int) method you can call, because an SCollection really is a graph, not a list.

Because it plugs into the Apache Beam API, which is an SDK for various data processing frameworks, you can choose different runners.

REPL

Scio comes with a REPL, which means you can use it to understand various concepts and operations. You can install it on a Mac by using the public Spotify Homebrew tap. If you’re not on a Mac, you can use one of the pre-built jars.

Inside the REPL you execute code as usual. A Scio context (sc) and BigQuery client (bq) have already been set up when the REPL fires up.

Let’s create a basic data structure and two sample instances of an SCollection for joining.

case class Data(k: Int, v: String)

val lhs = sc.parallelize((2 to 6).toList.map(i => Data(i, s"LEFT:$i"))).keyBy(_.k)
val rhs = sc.parallelize(List.range(1, 10, 2).map(i => Data(i, s"RIGHT:$i"))).keyBy(_.k)

The keyBy(f: T => K) method operates on SCollection[T] and turns it into SCollection[(K, T)], a (key, value) tuple. You can achieve the same with anSCollection.map( coll => (coll.k, coll) ).

In Scio, join conditions are not specified in an argument with a join expression, such as in Spark. Instead they are implied by creating key-value pairs that are joined on the keys. Functions for such key-value ‘collections’ are defined in PairSCollectionFunctions[K, V].

Inner Join

An inner join is simple enough:

val inner = lhs.join(rhs)

For an SCollection[(K, V)] and another SCollection[(K, W)] the result is of type SCollection[(K, (V, W))]. If you want to further process the data, a standard way to do it is with pattern matching: inner.map{ case (k, (v, w)) => ... }.

Note that materialize is needed to see the output after the Scio context has been closed, that is, when the computation graph has been fully defined and is executed (lazily):

val result = inner.materialize

sc.close()

result.waitForResult().value.foreach(println)

As expected, this prints a tuple of type (Int, (Data, Data)):

( 3, ( Data(3,LEFT:3), Data(3,RIGHT:3) ) )
( 5, ( Data(5,LEFT:5), Data(5,RIGHT:5) ) )

Because an SCollection represents a distributed data set, the order of elements is not deterministic unless forced. For convenience, I have ordered the results.

Outer Joins

To instantiate a fresh context, execute :newScio sc. Please note that you still have to recreate the case class and data itself!

For outer joins, Scio produces Option[V] and/or Option[W]. In a left join, the return type is (K, (V, Option[W])). A right join produces (K, (Option[V], W)). And we obtain (K, (Option[V], Option[W])) for full outer joins.

Here is the syntax:

val left  = lhs.leftOuterJoin(rhs)
val right = lhs.leftOuterJoin(rhs)
val full  = lhs.fullOuterJoin(rhs)
val leftResult  = left.materialize
val rightResult = right.materialize
val fullResult  = full.materialize

sc.close()

leftResult.waitForResult().value.foreach(println)

// ( 2, ( Data(2,LEFT:2), None ) )
// ( 3, ( Data(3,LEFT:3), Some(Data(3,RIGHT:3)) ) )
// ( 4, ( Data(4,LEFT:4), None ) )
// ( 5, ( Data(5,LEFT:5), Some(Data(5,RIGHT:5)) ) )
// ( 6, ( Data(6,LEFT:6), None ) )

rightResult.waitForResult().value.foreach(println)

// ( 2, ( Data(2,LEFT:2), None ) )
// ( 3, ( Data(3,LEFT:3), Some(Data(3,RIGHT:3)) ) )
// ( 4, ( Data(4,LEFT:4), None ) )
// ( 5, ( Data(5,LEFT:5), Some(Data(5,RIGHT:5)) ) )
// ( 6, ( Data(6,LEFT:6), None ) )

fullResult.waitForResult().value.foreach(println)

// ( 1, ( None, Some(Data(1,RIGHT:1)) ) )
// ( 2, ( Some(Data(2,LEFT:2)), None ) )
// ( 3, ( Some(Data(3,LEFT:3)), Some(Data(3,RIGHT:3)) ) )
// ( 4, ( Some(Data(4,LEFT:4)), None ) )
// ( 5, ( Some(Data(5,LEFT:5)), Some(Data(5,RIGHT:5)) ) )
// ( 6, ( Some(Data(6,LEFT:6)), None ) )
// ( 7, ( None, Some(Data(7,RIGHT:7)) ) )
// ( 9, ( None, Some(Data(9,RIGHT:9)) ) )

Again, we can use pattern matching to process the data further:

full.map{
  case (k, (Some(v), Some(w))) => ...
  case (k, (Some(v), None)) => ...
  ...
}

Hash Joins: Side Inputs

Side inputs are small, in-memory data structures that are replicated to all workers. If you need to join a large data set with a tiny one, you can use a hash*Join on the large data set and treat the small data set as a side input. The syntax is essentially the same, so all you need to take care of is ensuring the smaller data set fits in memory:

lhs.hashJoin(rhs)
lhs.hashLeftJoin(rhs)      // NOTE no Outer!
lhs.hashFullOuterJoin(rhs) // NOTE with Outer!

As you can see, there is no hashRightJoin because the assumption is that the larger data set is on the left-hand side.

Co-Group

The co-group operation is roughly a groupByKey followed by a full outer join. The output is of type SCollection[(K, (Iterable(V), Iterable(W)))]. If keys are duplicated in either the left or right data set, each key will only be available once, and the values are listed as instances of an Iterable. The order of elements inside each Iterable is non-deterministic.

val cogroup = lhs.cogroup(rhs)

val cogroupResult = cogroup.materialize

sc.close()

cogroupResult.waitForResult().value.foreach(println)

This yields

( 1, ( List(), List(Data(1,RIGHT:1)) ) )
( 2, ( List(Data(2,LEFT:2)), List() ) )
( 3, ( List(Data(3,LEFT:3)), List(Data(3,RIGHT:3)) ) )
( 4, ( List(Data(4,LEFT:4)), List() ) )
( 5, ( List(Data(5,LEFT:5)), List(Data(5,RIGHT:5)) ) )
( 6, ( List(Data(6,LEFT:6)), List() ) )
( 7, ( List(), List(Data(7,RIGHT:7)) ) )
( 9, ( List(), List(Data(9,RIGHT:9)) ) )

Please notice the empty lists for the keys that only appear on either the left-hand side (LHS) or right-hand side (RHS).

MultiJoin

A MultiJoin is a join of up to 22 data sets. It works as follows:

import com.spotify.scio.util.MultiJoin

MultiJoin(lhs, rhs)         // lhs.join(rhs)
MultiJoin.left(lhs, rhs)    // lhs.leftOuterJoin(rhs)
MultiJoin.outer(lhs, rhs)   // lhs.fullOuterJoin(rhs)
MultiJoin.cogroup(lhs, rhs) // lhs.cogroup(rhs)

Additional data sets can be added as arguments.

Recommendations

Shuffle joins are inside all equi-joins. For these joins all input data is shuffled, so that the order of the data sets (based on size) does not matter to the performance. It is recommended that all data sets be ordered in descending size, because non-shuffle joins do require the largest data sets to be on the left of any chain of operators or when using a MultiJoin.

If the RHS is small enough to fit in memory, a hashJoin, hashLeftJoin, or hashFullOuterJoin can be used. With a hash join, the RHS is replicated as a Map side input to all workers, so that there is no need to shuffle the LHS. The size of Google Cloud Dataflow worker memory can be tweaked if necessary.

A skewedJoin is preferable when the LHS has ‘hot keys’, keys with many values, but the RHS’s keys are unique. The fragmented-replicated join algorithm ‘fragments’ the RHS into a hot and a cold set based on which keys of the LHS are duplicated. After the fragmentation, the hot keys are joined using a hash join (‘replicated’); the cold set is joined with a regular shuffle join.

When you need to perform a join small.rightOuterJoin(large), it’s best to use sparseRightOuterJoin. The various sparse joins are best suited when the RHS is much smaller than the LHS, but the RHS may not fit in memory. Internally, a bloom filter is used on the keys from the RHS, which is used as a side input to the LHS.

Range (theta) joins as well as fragmented-replicated joins with skew on both sides are not yet supported out of the box. If you want more details, have a look at the examples or recent deep dive.