I spent a couple hours yesterday debugging what I thought was a Protobuf serialization issue, which turns out to be an unlawful Monoid-like use of `aggregateByKey`

in Scio.

## The Problem

Both Scio and Spark have `aggregate`

and `aggregateByKey`

transformations that look like this:

```
// on SCollection[V]
def aggregate[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): SCollection[U]
// on SCollection[(K, V)]
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): SCollection[(K, U)]
```

And we have some business logic that looks like this:

```
case class Count(id: String, count: Int)
val z = Count("", 0) // zeroValue
def seqOp(acc: Count, v: Count) = Count(v.id, acc.count + v.count)
def combOp(x: Count, y: Count) = Count(x.id, x.count + y.count)
sc.parallelize(Seq(Count("a", 10), Count("a", 100), Count("b", 5), Count("b", 50)))
.groupBy(_.id)
.aggregateByKey(z)(seqOp, combOp)
```

This code however, only works correctly locally with `DirectRunner`

and always produces results with `id == ""`

when running on Dataflow service with the `DataflowRunner`

. Can you spot the bug?

## Monoid laws

You might notice that `zeroValue`

and `combOp`

together resemble a Monoid, which should satisfy the identity law:

more ...