Quick and easy coders for POJOs in your Beam pipeline
You’re not alone. When writing the first Apache Beam pipeline in Java, many of us were stopped in our way by this error message:
Exception in thread “main” java.lang.IllegalArgumentException: Unable to infer a coder and no Coder was specified. Please set a coder by invoking Create.withCoder() explicitly or a schema by invoking Create.withSchema().
This article demonstrates a few ways to make the error go away by serializing your Plain Old Java Objects (POJOs) using:
- Serializable
- Apache Avro
- Protocol Buffers
Full examples are available on GitHub.
Let’s look at a simple Beam pipeline. It creates a collection of Player
objects and prints out the Player
names and scores.
- POJO — this is the
Player
class definition.
2. Beam pipeline — this is a simple pipeline that first creates a PCollection
of Players
and then performs an element-wise MapElement
transform on each Player
object.
Serializable
Implementing Serializable
is the most straightforward way to serialize your POJO.
- Make your class inherit from Serializable:
static class Player implements Serializable {}
2. Generate code for equal()
and hashCode()
:
In IntelliJ, this can be accomplished by navigating to Code > Generate … > equals() and hashCode().
Run the pipeline:
mvn compile exec:java -Dexec.mainClass=SerializablePlayerExample
It will use the DirectRunner
unless you specify a different runner. See Player
in the output. Note that the output elements can be in any order.
kestrel: 100
owl: 22
finch: 95
Process finished with exit code 0
See full example here.
Avro
Don’t worry if you have not heard of Apache Avro. It is a widely used data serialization system supported by Beam.
The following steps will generate a Java class using Avro based on your POJO:
- Download Avro.
- Build Avro using Maven.
cd avro-src-1.10.2/lang/java/tools/
mvn clean install
3. Create an Avro schema file formatted in JSON that matches your class definition. See supported types.
AvroPlayer.avsc
{
"type":"record",
"name":"AvroPlayer",
"namespace":"utilities",
"doc":"A player.",
"fields":[
{
"name":"name",
"type":"string",
"doc":"The player name."
},
{
"name":"score",
"type":"int",
"doc":"The player score."
}
]
}
4. Generate a Java class based on your class definition in the .avsc
file:
java -jar \
~/avro-src-1.10.2/lang/java/tools/target/avro-tools-1.10.2.jar \
compile schema \
path/to/your/.avsc/file \
destination/folder/
Run the pipeline:
mvn compile exec:java -Dexec.mainClass=AvroPlayerExample
Again, it will run using the DirectRunner
. The output elements can be in any order.
kestrel: 100
owl: 22
finch: 95
Process finished with exit code 0
See full example here.
Protocol Buffers
Google Protocol Buffers is another widely used data serialization system supported by Beam.
The following steps will generate a Java class in protocol buffers based on the POJO:
- Install protoc. Homebrew worked for me on MacOS.
brew install protoc
2. Create a Proto schema file that matches your class definition.
ProtoPlayer.proto
syntax = "proto3";package utilities;option java_outer_classname = "ProtoPlayer";message Player {
string name = 1;
int32 score = 2;
}
3. Generate a Java class based on your class definition in the .proto file:
protoc \
--proto_path /your/.proto/file/folder/ \
--java_out destination/folder/ \
path/to/your/.proto/file
Again, run the pipeline:
mvn compile exec:java -Dexec.mainClass=ProtoPlayerExample
It will run using the DirectRunner
unless otherwise specified. The output elements can be in any order.
kestrel: 100
owl: 22
finch: 95
Process finished with exit code 0
See full example here.
This article covers just a few ways to serialize your data but Beam supports many more. Hope I have helped unblock some of you in your quest to code real business logic using Beam!