Lawfulness of aggregateByKey

I spent a couple hours yesterday debugging what I thought was a Protobuf serialization issue, which turns out to be an unlawful Monoid-like use of aggregateByKey in Scio.

The Problem

Both Scio and Spark have aggregate and aggregateByKey transformations that look like this:

// on SCollection[V]
def aggregate[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): SCollection[U]

// on SCollection[(K, V)]
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): SCollection[(K, U)]

And we have some business logic that looks like this:

case class Count(id: String, count: Int)

val z = Count("", 0) // zeroValue
def seqOp(acc: Count, v: Count) = Count(v.id, acc.count + v.count)
def combOp(x: Count, y: Count) = Count(x.id, x.count + y.count)

sc.parallelize(Seq(Count("a", 10), Count("a", 100), Count("b", 5), Count("b", 50)))
  .groupBy(_.id)
  .aggregateByKey(z)(seqOp, combOp)

This code however, only works correctly locally with DirectRunner and always produces results with id == "" when running on Dataflow service with the DataflowRunner. Can you spot the bug?

Monoid laws

You might notice that zeroValue and combOp together resemble a Monoid, which should satisfy the identity law:

combOp(zeroValue …
more ...

CanBuildFrom

We recently had an internal knowledge sharing on higher-kinded types and CanBuildFrom type classes in Scala. Here’s a short summary.

Basics

Let’s start by implementing map.

def map(xs: Seq[Int], f: Int => Double): Seq[Double] = xs.map(f)
map(Seq(1, 2, 3), _ + 0.1)

This implementation is not very good since it only works with Seq[Int] and Int => Double. It’s easy to parameterize Int and Double.

def map[A, B](xs: Seq[A], f: A => B): Seq[B] = xs.map(f)

However map(Seq(1, 2, 3), _ + 0.1) now fails to compile with a message missing parameter type for expanded function ((x$1) => x$1.$plus(10))

This is because inference of A in f: A => B depends on the type of xs: Seq[A], and limitation of Scala type inference. A common workaround is to curry arguments.

def map[A, B](xs: Seq[A])(f: A => B): Seq[B] = xs.map(f)
map(Seq(1, 2, 3))(_ + 0.1)

Similar pattern is commonly seen in Scala, like foldLeft(z: B)(op: (B, A) => B). Another benefit is we can now write f in a multi-line {} block more elegantly.

map …
more ...

Decompiling Scala code

I was bored today and decided to decompile some Scala code for fun and profit. I’m using Scala 2.12.2 and Java 1.8.0_121.

Scala object

package javap

object Test01 {
  def main(args: Array[String]): Unit = Unit
}
public final class javap.Test01$ {
  public static javap.Test01$ MODULE$;
  public static {};
  public void main(java.lang.String[]);
  private javap.Test01$();
}

public final class javap.Test01 {
  public static void main(java.lang.String[]);
}

As we can see a Scala object is compiled to 2 Java classes, Test01 with static methods for Java compatibility and a Test01$ with a static instance of itself as MODULE$, so that Test01 can be used as an instance value in Scala.

Class constructors

package javap

class Test02(val x: Int, val y: Int, z: Int) {
  def this(x: Int, y: Int) = this(x, y, 0)
}
public class javap.Test02 {
  private final int x;
  private final int y;
  public int x();
  public int y();
  public javap.Test02(int, int, int);
  public javap.Test02(int, int);
}

Looks like the default constructor (val x: Int, val y: Int, z: Int) and the overloaded one (x: Int, y: Int) each generated a Java constructor. However only x and y …

more ...

Implicits

In this post we’re going to take a closer look at Scala implicits and various use cases.

Basics

Let’s first look at the basics. There’re 3 main basic uses of implicits, as an argument, as a conversion method, and enhancing an existing class, a.k.a. the “Pimp My Library” pattern.

Implicit arguments

Suppose we have a basic function like this.

def plus(x: Int) = x + 1
plus(10) // => 11

We can add a second argument and make it a curried function.

def plus(x: Int)(y: Int) = x + y
plus(10)(1) // => 11

We can then make the second argument implicit and supply it via an implicit val.

def plus(x: Int)(implicit y: Int) = x + y

implicit val one = 1
plus(10) // => 11

Since plus needs an implicit argument of type Int and there happens to be one in the scope, one is applied automatically. However it won’t work if there are multiple implicit vals.

implicit val one = 1
implicit val two = 2
plus(10) // => ambiguous implicit values

This example isn’t very interesting and one can usually use argument with a default value instead. However implicit arguments are handy for decoupling behavior …

more ...

Joins

We recently started teaching Scio at Spotify’s internal data university and I made these slides to explain how joins work and some lower level details.

more ...





Scala Workshop

While there are many Scala tutorials and books available, very few of them focus on big data. I did a couple of workshops at Spotify focusing on these areas and here are the slides.

more ...