Develop Pulsar connectors for Pub/Sub

Tianzi Cai
Google Cloud - Community
7 min readMay 14, 2023

--

Pulsar connectors for Pub/Sub

This blog post covers the key steps in developing Apache Pulsar connectors for Google Cloud Pub/Sub. It contains working code that supports the most basic data relay scenarios; it presents an automation framework for building and testing on GitHub; and it discusses some considerations for these steps. If you’re new to Pub/Sub and/or new to developing Pulsar connectors using Pulsar IO, this blog post is a great place to get started.

Pub/Sub sink connector

Working on the sink connector before the source connector gives you a chance to familiarize yourself with sending messages to Pub/Sub, which is the simpler of the two. You can take apart the official Pub/Sub Java quickstart samples and place the publisher client creation and shut down code and publish calls in the open(), write(), and close() method in the Sink interface implementation.

package org.apache.pulsar.io.gcp;

import com.google.api.gax.batching.BatchingSettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;

@Slf4j
public class PubsubSink implements Sink<GenericObject> {

private Publisher publisher = null;

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
PubsubSinkConfig pubsubSinkConfig = PubsubSinkConfig.load(config);
TopicName topicName =
TopicName.of(pubsubSinkConfig.getProjectId(), pubsubSinkConfig.getTopicId());
BatchingSettings batchingSettings =
BatchingSettings.newBuilder()
.setElementCountThreshold(pubsubSinkConfig.getBatchSize())
.build();
this.publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();
}

@Override
public void write(Record<GenericObject> record) {
if (record.getMessage().isPresent()) {
ByteString data = ByteString.copyFrom(record.getMessage().get().getData());
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
this.publisher.publish(pubsubMessage);
}
}

@Override
public void close() {
if (this.publisher != null) {
this.publisher.shutdown();
log.info("Pub/Sub sink has been shut down.");
}
}
}

This example enables batching in the Pub/Sub publisher. If you are already familiar with batching in Pulsar, this is nothing new. Being able to set a maximum number of messages per batch enables higher throughput. Here are some additional custom settings on the Pub/Sub publisher client and their equivalents in Pulsar:

To add these custom settings to your sink connector: first, implement them in your sink config class as shown below; then, call them from sink connector class as shown above.

package org.apache.pulsar.io.gcp;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.pulsar.io.core.annotations.FieldDoc;

@Data
@Accessors(chain = true)
public class PubsubSinkConfig implements Serializable {

@FieldDoc(required = true, defaultValue = "", help = "Google Cloud project ID")
private String projectId = "";

@FieldDoc(required = true, defaultValue = "", help = "Google Cloud Pub/Sub topic ID")
private String topicId = "";

@FieldDoc(required = true, defaultValue = "", help = "Publisher batch size")
private Long batchSize = 10L;

public static PubsubSinkConfig load(Map<String, Object> map) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(new ObjectMapper().writeValueAsString(map), PubsubSinkConfig.class);
}
}

Pub/Sub source connector

At a high level, the source connector code is the same as the sink connector code. We still create and shut down the Pub/Sub subscriber client in the open() and close() method in the Source interface implementation. In addition, there is also a class PubsubRecord that implements a Pulsar Record and it’s used to convert a Pub/Sub message into a Pulsar record.

As an aside, a LinkedBlockingQueue could be used to temporarily store messages received from the Pub/Sub subscription. If so, in the read() method in the Pulsar PushSource interface implementation, you could pop the oldest messages from the queue and forward it to a Pulsar topic. However, this can introduce message loss if your program fails after a message is acknowledged and before it is successfully written to Pulsar. (Credit: Kamal Aboul-Hosn)

A better implementation passes AckReplyConsumer from Pub/Sub as an input argument to PubsubRecord where we overwite the ack() method and uses AckReplyConsumer to ack the Pub/Sub message in it. This makes sure that the message acknowledgement on the Pub/Sub side happens after the message has been successfully written to Pulsar.

package org.apache.pulsar.io.gcp;

import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;

@Slf4j
public class PubsubSource extends PushSource<byte[]> {
private Subscriber subscriber = null;

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {

PubsubSourceConfig pubsubSourceConfig = PubsubSourceConfig.load(config);

ProjectSubscriptionName projectSubscriptionName =
ProjectSubscriptionName.of(pubsubSourceConfig.getProjectId(), pubsubSourceConfig.getSubscriptionId());

ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(pubsubSourceConfig.getFlowSize())
.build();

Subscriber.Builder subscriberBuilder = Subscriber.newBuilder(projectSubscriptionName,
(PubsubMessage message, AckReplyConsumer consumer) -> {
Record<byte[]> record = new PubsubRecord(sourceContext.getOutputTopic(), message,
consumer);
consume(record);
});
subscriberBuilder.setFlowControlSettings(flowControlSettings);
subscriberBuilder.setParallelPullCount(pubsubSourceConfig.getNumStreams());
subscriberBuilder.setExecutorProvider(executorProvider);
subscriber = subscriberBuilder.build();
subscriber.startAsync().awaitRunning();
log.info("listening for messages on {}..", subscriber.getSubscriptionNameString());
}

@Override
public void close() {
if (this.subscriber != null) {
this.subscriber.stopAsync().awaitRunning();
}
}

private record PubsubRecord(String pulsarTopic, PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) implements Record<byte[]> {
@Override
public Optional<String> getKey() {
if (!this.pubsubMessage.getOrderingKey().isEmpty()) {
return Optional.of(this.pubsubMessage.getOrderingKey());
} else {
return Optional.empty();
}
}

@Override
public byte[] getValue() {
return this.pubsubMessage.getData().toByteArray();
}

@Override
public Optional<Long> getEventTime() {
return Optional.of(this.pubsubMessage.getPublishTime().getSeconds());
}

@Override
public Map<String, String> getProperties() {
return this.pubsubMessage.getAttributesMap();
}

@Override
public Optional<String> getDestinationTopic() {
return Optional.of(this.pulsarTopic);
}

@Override
public void ack(){ ackReplyConsumer.ack(); }

@Override
public void fail() { ackReplyConsumer.nack(); }
}
}

Besides performing simple message forwarding, this example also shows how to enable flow control and concurrency control in the Pub/Sub subscriber client. Flow control makes sure that no single subscriber gets overwhelmed or starved. Concurrency control allows multiple gRPC bidirectional streams to be opened for pulling messages, i.e, setParallelPullCount(), and a multiple of that number of threads to be used for processing messages in the message callback function, i.e, setExecutorThreadCount().

The example allows user input for the former and hard codes the latter. Generally, you should let users decide what these values are based on how CPU-bound their message processing is and how well they can handle the incoming message volume.

Other subscriber features such as exactly once delivery and filtering can also be implemented.

Build

The connectors here use Apache Pulsar 2.11.xand Java 17. To package them as a NAR, you can add the following build plugin to your pom.xml:

    <build>
<plugins>
<plugin>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-maven-plugin</artifactId>
<version>${nifi.nar.plugin.version}</version>
<extensions>true</extensions>
<configuration>
<finalName>${project.artifactId}-${project.version}</finalName>
</configuration>
<executions>
<execution>
<id>default-nar</id>
<phase>package</phase>
<goals>
<goal>nar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

It is also necessary to create a META-INF/services/pulsar-io.yaml file that specifies these connector parameters:

name: pubsub-connector
description: Pub/Sub Sink and Source Connectors
sinkClass: org.apache.pulsar.io.gcp.PubsubSink
sinkConfigClass: org.apache.pulsar.io.gcp.PubsubSinkConfig
sourceClass: org.apache.pulsar.io.gcp.PubsubSource
sourceConfigClass: org.apache.pulsar.io.gcp.PubsubSourceConfig

In my case, running mvn clean package -DskipTests=true creates a target/tz-pulsar-io-1.0-SNAPSHOT.nar of roughly 84 MB.

Test

The README file in my repo describes how to run the connectors with a local standalone Pulsar cluster. When I was developing them, that was how I tested them. However, that was not an efficient way to go about testing because it involves a number of setup steps. Leveraging Cloud Build to start a local standalone Pulsar cluster saves a lot of time here.

FROM maven:3.9-amazoncorretto-17-debian
RUN apt-get update && \
apt-get install -yq wget procps && \
wget https://archive.apache.org/dist/pulsar/pulsar-2.11.1/apache-pulsar-2.11.1-bin.tar.gz && \
tar xvfz apache-pulsar-2.11.1-bin.tar.gz && \
cd apache-pulsar-2.11.1 && \
pwd

This Dockerfile pulls a Maven image with Java 17 and Maven preinstalled. It also installs wget which is used to download Pulsar.

steps:
- id: 'pulsar-standalone'
name: gcr.io/cloud-builders/docker
args: [ 'build', '-t', '${_LOCATION}-docker.pkg.dev/$PROJECT_ID/${_REPOSITORY}/${_IMAGE}', '.' ]

- id: 'run-tests'
name: ${_LOCATION}-docker.pkg.dev/$PROJECT_ID/${_REPOSITORY}/${_IMAGE}
script: |
#!/usr/bin/env bash
export PULSAR_HOME=/apache-pulsar-2.11.1/
${PULSAR_HOME}/bin/pulsar-daemon start standalone
sleep 20
${PULSAR_HOME}/bin/pulsar-admin brokers healthcheck
mvn clean package -DskipTests=true
cp target/tz-pulsar-io-1.0-SNAPSHOT.nar ${PULSAR_HOME}/examples/
cp -r src/test/resources/ ${PULSAR_HOME}/examples/
${PULSAR_HOME}/bin/pulsar-admin sinks localrun --sink-config-file ${PULSAR_HOME}/examples/resources/tz-pubsub-sink.yaml --archive ${PULSAR_HOME}/examples/tz-pulsar-io-1.0-SNAPSHOT.nar &
mvn test -Dtest=org.apache.pulsar.io.gcp.PubsubSinkTest
${PULSAR_HOME}/bin/pulsar-admin sources localrun --source-config-file ${PULSAR_HOME}/examples/resources/tz-pubsub-source.yaml --archive ${PULSAR_HOME}/examples/tz-pulsar-io-1.0-SNAPSHOT.nar &
mvn test -Dtest=org.apache.pulsar.io.gcp.PubsubSourceTest
env:
- 'PROJECT_ID=${PROJECT_ID}'
- 'TOPIC_ID=${_TOPIC_ID}'

images:
- '${_LOCATION}-docker.pkg.dev/$PROJECT_ID/${_REPOSITORY}/${_IMAGE}'
substitutions:
_LOCATION: us-central1
_REPOSITORY: pulsar
_IMAGE: pulsar-standalone

This cloudbuild.yaml is broken down into two steps. Step #1 pulsar-standalone builds a Docker image defined in Dockerfile and uploads the image to Artifact Registry. From there, step #2 run-tests starts a standalone Pulsar cluster, package a NAR for the connectors, and then runs the sink and source integration tests separately with the help of Pulsar CLI tools.

Admittedly it took me some effort to get this script working right. Here are the reasons for some odd looking choices in it:

  • sleep 20 is necessary because the healthcheck command does not return ok right away. A failure here will cause the rest of the test to fail.
  • Copying the nar to ${PULSAR_HOME} from /workspace, which is the default volume that Cloud Build provides, is also necessary. Pointing --archive to a packaged nar in /workspace returned errors like Sink/Source doesn't exist.
  • Using the & operator to run the sink and source connector in the background while starting mvn clean test makes sure a running Pulsar cluster is available for the integration tests.

I set up a Cloud Build trigger to run integration tests on each new commit to the main branch. There are other triggers available too, such as pushes to a different branch, commits with certain tags, or pull requests.

This is a screenshot of my Cloud Build history. Build faf6afb7 was triggered by my latest commit to update README.md.

All the working code in this blog post can be found on GitHub. Please note that although the connectors work as intended, they are not production-grade, and no release has been made. It is a simple implementation meant for learning purposes and does not have support for all the features of Pub/Sub. However, the CI/CD setup using Cloud Build is widely applicable and I highly recommend it in your development and release cycles. Hope you find this blog post helpful. Don’t hesitate to reach out for questions!

--

--