Lambda serialization

Lambda serialization is one of the more confusion issues in distributed data processing in Scala. No matter which framework you choose, whether it’s Scalding, Spark, Flink or Scio, sooner or later you’ll be hit by the dreaded NotSerializableException. In this post we’ll take a closer look at the common causes and solutions to this problem.

Setup

To demonstrate the problem, first we need a minimal setup that minics the behavior of a distributed data processing system. We start with a utility method that roundtrips an object throguh Java serialization. Anonymous functions, or lambdas, in such systems are serialized so that they can be distributed to workers for parallel processing.

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}

object SerDeUtil {
  def serDe[T](obj: T): T = {
    val buffer = new ByteArrayOutputStream()
    val out = new ObjectOutputStream(buffer)
    out.writeObject(obj)
    out.close()

    val in = new ObjectInputStream(new ByteArrayInputStream(buffer.toByteArray))
    in.readObject().asInstanceOf[T]
  }
}

Next we create a bare minimal Collection[T] type that mimics an abstract distributed data set, akin to TypedPipe, RDD, or SCollection in Scalding, Spark or Scio respectively. Our implementation is backed by a local in-memory Seq[T] but does pass the function f through serialization like …

more ...

Lawfulness of aggregateByKey

I spent a couple hours yesterday debugging what I thought was a Protobuf serialization issue, which turns out to be an unlawful Monoid-like use of aggregateByKey in Scio.

The Problem

Both Scio and Spark have aggregate and aggregateByKey transformations that look like this:

// on SCollection[V]
def aggregate[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): SCollection[U]

// on SCollection[(K, V)]
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): SCollection[(K, U)]

And we have some business logic that looks like this:

case class Count(id: String, count: Int)

val z = Count("", 0) // zeroValue
def seqOp(acc: Count, v: Count) = Count(v.id, acc.count + v.count)
def combOp(x: Count, y: Count) = Count(x.id, x.count + y.count)

sc.parallelize(Seq(Count("a", 10), Count("a", 100), Count("b", 5), Count("b", 50)))
  .groupBy(_.id)
  .aggregateByKey(z)(seqOp, combOp)

This code however, only works correctly locally with DirectRunner and always produces results with id == "" when running on Dataflow service with the DataflowRunner. Can you spot the bug?

Monoid laws

You might notice that zeroValue and combOp together resemble a Monoid, which should satisfy the identity law:

combOp(zeroValue …
more ...

CanBuildFrom

We recently had an internal knowledge sharing on higher-kinded types and CanBuildFrom type classes in Scala. Here’s a short summary.

Basics

Let’s start by implementing map.

def map(xs: Seq[Int], f: Int => Double): Seq[Double] = xs.map(f)
map(Seq(1, 2, 3), _ + 0.1)

This implementation is not very good since it only works with Seq[Int] and Int => Double. It’s easy to parameterize Int and Double.

def map[A, B](xs: Seq[A], f: A => B): Seq[B] = xs.map(f)

However map(Seq(1, 2, 3), _ + 0.1) now fails to compile with a message missing parameter type for expanded function ((x$1) => x$1.$plus(10))

This is because inference of A in f: A => B depends on the type of xs: Seq[A], and limitation of Scala type inference. A common workaround is to curry arguments.

def map[A, B](xs: Seq[A])(f: A => B): Seq[B] = xs.map(f)
map(Seq(1, 2, 3))(_ + 0.1)

Similar pattern is commonly seen in Scala, like foldLeft(z: B)(op: (B, A) => B). Another benefit is we can now write f in a multi-line {} block more elegantly.

map …
more ...

Decompiling Scala code

I was bored today and decided to decompile some Scala code for fun and profit. I’m using Scala 2.12.2 and Java 1.8.0_121.

Scala object

package javap

object Test01 {
  def main(args: Array[String]): Unit = Unit
}
public final class javap.Test01$ {
  public static javap.Test01$ MODULE$;
  public static {};
  public void main(java.lang.String[]);
  private javap.Test01$();
}

public final class javap.Test01 {
  public static void main(java.lang.String[]);
}

As we can see a Scala object is compiled to 2 Java classes, Test01 with static methods for Java compatibility and a Test01$ with a static instance of itself as MODULE$, so that Test01 can be used as an instance value in Scala.

Class constructors

package javap

class Test02(val x: Int, val y: Int, z: Int) {
  def this(x: Int, y: Int) = this(x, y, 0)
}
public class javap.Test02 {
  private final int x;
  private final int y;
  public int x();
  public int y();
  public javap.Test02(int, int, int);
  public javap.Test02(int, int);
}

Looks like the default constructor (val x: Int, val y: Int, z: Int) and the overloaded one (x: Int, y: Int) each generated a Java constructor. However only x and y …

more ...

Implicits

In this post we’re going to take a closer look at Scala implicits and various use cases.

Basics

Let’s first look at the basics. There’re 3 main basic uses of implicits, as an argument, as a conversion method, and enhancing an existing class, a.k.a. the “Pimp My Library” pattern.

Implicit arguments

Suppose we have a basic function like this.

def plus(x: Int) = x + 1
plus(10) // => 11

We can add a second argument and make it a curried function.

def plus(x: Int)(y: Int) = x + y
plus(10)(1) // => 11

We can then make the second argument implicit and supply it via an implicit val.

def plus(x: Int)(implicit y: Int) = x + y

implicit val one = 1
plus(10) // => 11

Since plus needs an implicit argument of type Int and there happens to be one in the scope, one is applied automatically. However it won’t work if there are multiple implicit vals.

implicit val one = 1
implicit val two = 2
plus(10) // => ambiguous implicit values

This example isn’t very interesting and one can usually use argument with a default value instead. However implicit arguments are handy for decoupling behavior …

more ...

Scio at Philly ETE

It’s been another 6 months since my talk about Scio at Scala by the Bay. We’ve seen huge adoption and improvements since then. The number of production Scio pipelines has grown from ~70 to 400+ within Spotify. A lot of other companies are using and contributing to it as well. In the most recent edition of the Spotify data university, an internal week long big data training camp for non-data engineers, we revamped the curriculum to cover Scio, BigQuery and other Google Cloud Big Data products instead of Hadoop, Scalding and Hive.

And here’s a list of some notable improvements in Scio.

  • Master branch is now based on Apache Beam
  • Graduate type safe BigQuery API form experimental to stable
  • Sparkey side input support
  • TensorFlow TFRecord file IO
  • Cloud Pub/Sub attributes support
  • Named transformations for streaming update
  • Safe-guard against malformed tests and better error messages
  • Flexible custom IO wiring
  • KryoRegistrar for custom Kryo serialization
  • Table description for type-safe BigQuery
  • Lots of performance improvements and bug fixes

I talked about …

more ...

Joins

We recently started teaching Scio at Spotify’s internal data university and I made these slides to explain how joins work and some lower level details.

more ...


Scio at Scala by the Bay

It’s been 7 months since we first announced Scio at GCPNEXT16. There’re now dozens of internal teams and a couple of other companies using Scio to run hundreds of pipelines on a daily basis. Within Spotify, Scio is now the prefered framework for building new data pipelines on Google Cloud Platform. We’ve also made 19 released and added tons of features and improvements. Below is a list of some notable ones.

  • Interactive REPL
  • Type safe BigQuery macro improvements and Scio-IDEA-plugin
  • BigQuery standard SQL 2011 syntax support
  • HDFS source and sink
  • Avro file compression support
  • Bigtable multi-table sink and utility for cluster scaling
  • Protobuf file support and usability improvements
  • Accumulator usability improvements
  • End-to-end testing utilities and matchers improvements
  • Join performance improvements and skewed join
  • Metrics interface and enhancements

I talked about Scio at Scala by the Bay last week and here are the slides.

more ...