Automatic type-class derivation with Shapeless

Date Category code Tags scala / fp

We had a knowledge sharing session at work recently on Shapeless for automatic type class derivation. Here is a little write-up for the topic.

Scala List

First let’s review how List works in Scala. A List is a linked list with head and tail, plus Nil for empty list. It can be represented with the following abstract data type:

sealed trait List[+A] {
  def ::[B >: A](head: B): List[B] = Cons(head, this)
}
case object Nil extends List[Nothing] // Nothing is a sub-type of every other type
case class Cons[+A](head: A, tail: List[A]) extends List[A]

Notice that ::, the list concatenation operation, is just a method on trait List[+A]. Since Scala operators that end with : are right-associative, we can conveniently create lists by chaining multiple ::s. Therefore the following expressions are equivalent:

1 :: 2 :: Nil
1 :: (2 :: Nil)
Nil.::(2).::(1)
Cons(1, Cons(2, Nil))

It’s important to point out here that Scala List is homogeneous, i.e. it has a single type parameter A and thus can only store elements of A and its sub-types. On the other hand, it can have varying numbers of elements at runtime.

Shapeless HList

Since List …

more ...

Lambda serialization

Lambda serialization is one of the more confusion issues in distributed data processing in Scala. No matter which framework you choose, whether it’s Scalding, Spark, Flink or Scio, sooner or later you’ll be hit by the dreaded NotSerializableException. In this post we’ll take a closer look at the common causes and solutions to this problem.

Setup

To demonstrate the problem, first we need a minimal setup that minics the behavior of a distributed data processing system. We start with a utility method that roundtrips an object throguh Java serialization. Anonymous functions, or lambdas, in such systems are serialized so that they can be distributed to workers for parallel processing.

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}

object SerDeUtil {
  def serDe[T](obj: T): T = {
    val buffer = new ByteArrayOutputStream()
    val out = new ObjectOutputStream(buffer)
    out.writeObject(obj)
    out.close()

    val in = new ObjectInputStream(new ByteArrayInputStream(buffer.toByteArray))
    in.readObject().asInstanceOf[T]
  }
}

Next we create a bare minimal Collection[T] type that mimics an abstract distributed data set, akin to TypedPipe, RDD, or SCollection in Scalding, Spark or Scio respectively. Our implementation is backed by a local in-memory Seq[T] but does pass the function f through serialization like …

more ...

Lawfulness of aggregateByKey

I spent a couple hours yesterday debugging what I thought was a Protobuf serialization issue, which turns out to be an unlawful Monoid-like use of aggregateByKey in Scio.

The Problem

Both Scio and Spark have aggregate and aggregateByKey transformations that look like this:

// on SCollection[V]
def aggregate[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): SCollection[U]

// on SCollection[(K, V)]
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): SCollection[(K, U)]

And we have some business logic that looks like this:

case class Count(id: String, count: Int)

val z = Count("", 0) // zeroValue
def seqOp(acc: Count, v: Count) = Count(v.id, acc.count + v.count)
def combOp(x: Count, y: Count) = Count(x.id, x.count + y.count)

sc.parallelize(Seq(Count("a", 10), Count("a", 100), Count("b", 5), Count("b", 50)))
  .groupBy(_.id)
  .aggregateByKey(z)(seqOp, combOp)

This code however, only works correctly locally with DirectRunner and always produces results with id == "" when running on Dataflow service with the DataflowRunner. Can you spot the bug?

Monoid laws

You might notice that zeroValue and combOp together resemble a Monoid, which should satisfy the identity law:

combOp(zeroValue …
more ...

CanBuildFrom

We recently had an internal knowledge sharing on higher-kinded types and CanBuildFrom type classes in Scala. Here’s a short summary.

Basics

Let’s start by implementing map.

def map(xs: Seq[Int], f: Int => Double): Seq[Double] = xs.map(f)
map(Seq(1, 2, 3), _ + 0.1)

This implementation is not very good since it only works with Seq[Int] and Int => Double. It’s easy to parameterize Int and Double.

def map[A, B](xs: Seq[A], f: A => B): Seq[B] = xs.map(f)

However map(Seq(1, 2, 3), _ + 0.1) now fails to compile with a message missing parameter type for expanded function ((x$1) => x$1.$plus(10))

This is because inference of A in f: A => B depends on the type of xs: Seq[A], and limitation of Scala type inference. A common workaround is to curry arguments.

def map[A, B](xs: Seq[A])(f: A => B): Seq[B] = xs.map(f)
map(Seq(1, 2, 3))(_ + 0.1)

Similar pattern is commonly seen in Scala, like foldLeft(z: B)(op: (B, A) => B). Another benefit is we can now write f in a multi-line {} block more elegantly.

map …
more ...

Decompiling Scala code

I was bored today and decided to decompile some Scala code for fun and profit. I’m using Scala 2.12.2 and Java 1.8.0_121.

Scala object

package javap

object Test01 {
  def main(args: Array[String]): Unit = Unit
}
public final class javap.Test01$ {
  public static javap.Test01$ MODULE$;
  public static {};
  public void main(java.lang.String[]);
  private javap.Test01$();
}

public final class javap.Test01 {
  public static void main(java.lang.String[]);
}

As we can see a Scala object is compiled to 2 Java classes, Test01 with static methods for Java compatibility and a Test01$ with a static instance of itself as MODULE$, so that Test01 can be used as an instance value in Scala.

Class constructors

package javap

class Test02(val x: Int, val y: Int, z: Int) {
  def this(x: Int, y: Int) = this(x, y, 0)
}
public class javap.Test02 {
  private final int x;
  private final int y;
  public int x();
  public int y();
  public javap.Test02(int, int, int);
  public javap.Test02(int, int);
}

Looks like the default constructor (val x: Int, val y: Int, z: Int) and the overloaded one (x: Int, y: Int) each generated a Java constructor. However only x and y …

more ...

Implicits

In this post we’re going to take a closer look at Scala implicits and various use cases.

Basics

Let’s first look at the basics. There’re 3 main basic uses of implicits, as an argument, as a conversion method, and enhancing an existing class, a.k.a. the “Pimp My Library” pattern.

Implicit arguments

Suppose we have a basic function like this.

def plus(x: Int) = x + 1
plus(10) // => 11

We can add a second argument and make it a curried function.

def plus(x: Int)(y: Int) = x + y
plus(10)(1) // => 11

We can then make the second argument implicit and supply it via an implicit val.

def plus(x: Int)(implicit y: Int) = x + y

implicit val one = 1
plus(10) // => 11

Since plus needs an implicit argument of type Int and there happens to be one in the scope, one is applied automatically. However it won’t work if there are multiple implicit vals.

implicit val one = 1
implicit val two = 2
plus(10) // => ambiguous implicit values

This example isn’t very interesting and one can usually use argument with a default value instead. However implicit arguments are handy for decoupling behavior …

more ...

Scio at Philly ETE

It’s been another 6 months since my talk about Scio at Scala by the Bay. We’ve seen huge adoption and improvements since then. The number of production Scio pipelines has grown from ~70 to 400+ within Spotify. A lot of other companies are using and contributing to it as well. In the most recent edition of the Spotify data university, an internal week long big data training camp for non-data engineers, we revamped the curriculum to cover Scio, BigQuery and other Google Cloud Big Data products instead of Hadoop, Scalding and Hive.

And here’s a list of some notable improvements in Scio.

  • Master branch is now based on Apache Beam
  • Graduate type safe BigQuery API form experimental to stable
  • Sparkey side input support
  • TensorFlow TFRecord file IO
  • Cloud Pub/Sub attributes support
  • Named transformations for streaming update
  • Safe-guard against malformed tests and better error messages
  • Flexible custom IO wiring
  • KryoRegistrar for custom Kryo serialization
  • Table description for type-safe BigQuery
  • Lots of performance improvements and bug fixes

I talked about …

more ...

Joins

We recently started teaching Scio at Spotify’s internal data university and I made these slides to explain how joins work and some lower level details.

more ...