Quick and easy coders for POJOs in your Beam pipeline

Tianzi Cai
Google Cloud - Community
3 min readJul 23, 2021

--

You’re not alone. When writing the first Apache Beam pipeline in Java, many of us were stopped in our way by this error message:

Exception in thread “main” java.lang.IllegalArgumentException: Unable to infer a coder and no Coder was specified. Please set a coder by invoking Create.withCoder() explicitly or a schema by invoking Create.withSchema().

This article demonstrates a few ways to make the error go away by serializing your Plain Old Java Objects (POJOs) using:

  • Serializable
  • Apache Avro
  • Protocol Buffers

Full examples are available on GitHub.

Let’s look at a simple Beam pipeline. It creates a collection of Player objects and prints out the Player names and scores.

  1. POJO — this is the Player class definition.

2. Beam pipeline — this is a simple pipeline that first creates a PCollection of Players and then performs an element-wise MapElement transform on each Player object.

Serializable

Implementing Serializable is the most straightforward way to serialize your POJO.

  1. Make your class inherit from Serializable:
static class Player implements Serializable {}

2. Generate code for equal() and hashCode():

In IntelliJ, this can be accomplished by navigating to Code > Generate … > equals() and hashCode().

Run the pipeline:

mvn compile exec:java -Dexec.mainClass=SerializablePlayerExample

It will use the DirectRunner unless you specify a different runner. See Player in the output. Note that the output elements can be in any order.

kestrel: 100
owl: 22
finch: 95
Process finished with exit code 0

See full example here.

Avro

Don’t worry if you have not heard of Apache Avro. It is a widely used data serialization system supported by Beam.

The following steps will generate a Java class using Avro based on your POJO:

  1. Download Avro.
  2. Build Avro using Maven.
cd avro-src-1.10.2/lang/java/tools/
mvn clean install

3. Create an Avro schema file formatted in JSON that matches your class definition. See supported types.

AvroPlayer.avsc

{
"type":"record",
"name":"AvroPlayer",
"namespace":"utilities",
"doc":"A player.",
"fields":[
{
"name":"name",
"type":"string",
"doc":"The player name."
},
{
"name":"score",
"type":"int",
"doc":"The player score."
}
]
}

4. Generate a Java class based on your class definition in the .avsc file:

java -jar \
~/avro-src-1.10.2/lang/java/tools/target/avro-tools-1.10.2.jar \
compile schema \
path/to/your/.avsc/file \
destination/folder/

Run the pipeline:

mvn compile exec:java -Dexec.mainClass=AvroPlayerExample

Again, it will run using the DirectRunner. The output elements can be in any order.

kestrel: 100
owl: 22
finch: 95
Process finished with exit code 0

See full example here.

Protocol Buffers

Google Protocol Buffers is another widely used data serialization system supported by Beam.

The following steps will generate a Java class in protocol buffers based on the POJO:

  1. Install protoc. Homebrew worked for me on MacOS.
brew install protoc

2. Create a Proto schema file that matches your class definition.

ProtoPlayer.proto

syntax = "proto3";package utilities;option java_outer_classname = "ProtoPlayer";message Player {
string name = 1;
int32 score = 2;
}

3. Generate a Java class based on your class definition in the .proto file:

protoc \
--proto_path /your/.proto/file/folder/ \
--java_out destination/folder/ \
path/to/your/.proto/file

Again, run the pipeline:

mvn compile exec:java -Dexec.mainClass=ProtoPlayerExample

It will run using the DirectRunner unless otherwise specified. The output elements can be in any order.

kestrel: 100
owl: 22
finch: 95
Process finished with exit code 0

See full example here.

This article covers just a few ways to serialize your data but Beam supports many more. Hope I have helped unblock some of you in your quest to code real business logic using Beam!

--

--