I spent a couple hours yesterday debugging what I thought was a Protobuf serialization issue, which turns out to be an unlawful Monoid-like use of
aggregateByKey in Scio.
Both Scio and Spark have
aggregateByKey transformations that look like this:
// on SCollection[V] def aggregate[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): SCollection[U] // on SCollection[(K, V)] def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): SCollection[(K, U)]
And we have some business logic that looks like this:
case class Count(id: String, count: Int) val z = Count("", 0) // zeroValue def seqOp(acc: Count, v: Count) = Count(v.id, acc.count + v.count) def combOp(x: Count, y: Count) = Count(x.id, x.count + y.count) sc.parallelize(Seq(Count("a", 10), Count("a", 100), Count("b", 5), Count("b", 50))) .groupBy(_.id) .aggregateByKey(z)(seqOp, combOp)
This code however, only works correctly locally with
DirectRunner and always produces results with
id == "" when running on Dataflow service with the
DataflowRunner. Can you spot the bug?
You might notice that
combOp together resemble a Monoid, which should satisfy the identity law: