Lambda serialization

Lambda serialization is one of the more confusion issues in distributed data processing in Scala. No matter which framework you choose, whether it’s Scalding, Spark, Flink or Scio, sooner or later you’ll be hit by the dreaded NotSerializableException. In this post we’ll take a closer look at the common causes and solutions to this problem.

Setup

To demonstrate the problem, first we need a minimal setup that minics the behavior of a distributed data processing system. We start with a utility method that roundtrips an object throguh Java serialization. Anonymous functions, or lambdas, in such systems are serialized so that they can be distributed to workers for parallel processing.

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}

object SerDeUtil {
  def serDe[T](obj: T): T = {
    val buffer = new ByteArrayOutputStream()
    val out = new ObjectOutputStream(buffer)
    out.writeObject(obj)
    out.close()

    val in = new ObjectInputStream(new ByteArrayInputStream(buffer.toByteArray))
    in.readObject().asInstanceOf[T]
  }
}

Next we create a bare minimal Collection[T] type that mimics an abstract distributed data set, akin to TypedPipe, RDD, or SCollection in Scalding, Spark or Scio respectively. Our implementation is backed by a local in-memory Seq[T] but does pass the function f through serialization like …

more ...

Lawfulness of aggregateByKey

I spent a couple hours yesterday debugging what I thought was a Protobuf serialization issue, which turns out to be an unlawful Monoid-like use of aggregateByKey in Scio.

The Problem

Both Scio and Spark have aggregate and aggregateByKey transformations that look like this:

// on SCollection[V]
def aggregate[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): SCollection[U]

// on SCollection[(K, V)]
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): SCollection[(K, U)]

And we have some business logic that looks like this:

case class Count(id: String, count: Int)

val z = Count("", 0) // zeroValue
def seqOp(acc: Count, v: Count) = Count(v.id, acc.count + v.count)
def combOp(x: Count, y: Count) = Count(x.id, x.count + y.count)

sc.parallelize(Seq(Count("a", 10), Count("a", 100), Count("b", 5), Count("b", 50)))
  .groupBy(_.id)
  .aggregateByKey(z)(seqOp, combOp)

This code however, only works correctly locally with DirectRunner and always produces results with id == "" when running on Dataflow service with the DataflowRunner. Can you spot the bug?

Monoid laws

You might notice that zeroValue and combOp together resemble a Monoid, which should satisfy the identity law:

combOp(zeroValue …
more ...

Scio at Philly ETE

It’s been another 6 months since my talk about Scio at Scala by the Bay. We’ve seen huge adoption and improvements since then. The number of production Scio pipelines has grown from ~70 to 400+ within Spotify. A lot of other companies are using and contributing to it as well. In the most recent edition of the Spotify data university, an internal week long big data training camp for non-data engineers, we revamped the curriculum to cover Scio, BigQuery and other Google Cloud Big Data products instead of Hadoop, Scalding and Hive.

And here’s a list of some notable improvements in Scio.

  • Master branch is now based on Apache Beam
  • Graduate type safe BigQuery API form experimental to stable
  • Sparkey side input support
  • TensorFlow TFRecord file IO
  • Cloud Pub/Sub attributes support
  • Named transformations for streaming update
  • Safe-guard against malformed tests and better error messages
  • Flexible custom IO wiring
  • KryoRegistrar for custom Kryo serialization
  • Table description for type-safe BigQuery
  • Lots of performance improvements and bug fixes

I talked about …

more ...

Joins

We recently started teaching Scio at Spotify’s internal data university and I made these slides to explain how joins work and some lower level details.

more ...

Scio at Scala by the Bay

It’s been 7 months since we first announced Scio at GCPNEXT16. There’re now dozens of internal teams and a couple of other companies using Scio to run hundreds of pipelines on a daily basis. Within Spotify, Scio is now the prefered framework for building new data pipelines on Google Cloud Platform. We’ve also made 19 released and added tons of features and improvements. Below is a list of some notable ones.

  • Interactive REPL
  • Type safe BigQuery macro improvements and Scio-IDEA-plugin
  • BigQuery standard SQL 2011 syntax support
  • HDFS source and sink
  • Avro file compression support
  • Bigtable multi-table sink and utility for cluster scaling
  • Protobuf file support and usability improvements
  • Accumulator usability improvements
  • End-to-end testing utilities and matchers improvements
  • Join performance improvements and skewed join
  • Metrics interface and enhancements

I talked about Scio at Scala by the Bay last week and here are the slides.

more ...


Scio, a Scala API for Google Cloud Dataflow

We recently open sourced Scio, a Scala API for Google Cloud Dataflow. Here are the slides of our talk at GCPNEXT16 a few weeks ago.

The first half of the talk covers our experiments with Dataflow and Pub/Sub for streaming application while the second half covers Scio and BigQuery for batch analysis and machine learning.

more ...