Streaming Salesforce Events to Heroku Kafka

Heroku recently announced the new Apache Heroku Kafka service making it possible to have a managed and distributed commit log in the cloud. Apache Kafka was originally developed at LinkedIn, and provides a high-throughput and low-latency event-based system. (Note: For more details on what a “distributed commit log” is and why LinkedIn created Kafka, check out The Log: What every software engineer should know about real-time data’s unifying abstraction.) With Kafka event producers send events to the Kafka cluster and consumers can subscribe to those events. This is similar to a traditional messaging system but the main differences are that event order is guaranteed and events are durable with a configurable retention time. Kafka is being used for all sorts of use cases where an event stream needs to be ingested and then processed by one or more systems. The Salesforce IoT Cloud is built with Kafka so that it can handle a stream of device data and then process those events.

This blog will walk you through getting started with Heroku Kafka using the Salesforce Streaming API as an event producer. Future posts will focus on the event processing side of a Kafka system.

Salesforce Streaming API

The Salesforce Streaming API captures data events in Salesforce allowing subscribers to receive those events in near-realtime via an HTTP persistent connection.  The data events are in the context of an SObject and the same events available to triggers: create, update, delete, and undelete.  To setup streaming from Salesforce, you first need a “PushTopic” record.  This defines a SOQL query and events that when matched will send an event to any subscribers.  The easiest way to create a new PushTopic record is in the Developer Console.  Under the Debug menu, select “Open Execute Anonymous Window” and paste in the Apex code to create a new PushTopic, like:

PushTopic pushTopic = new PushTopic();
pushTopic.Name = 'ContactUpdates';
pushTopic.Query = 'SELECT Id, Name FROM Contact';
pushTopic.ApiVersion = 36.0;
pushTopic.NotifyForOperationCreate = true;
pushTopic.NotifyForOperationUpdate = true;
pushTopic.NotifyForOperationUndelete = true;
pushTopic.NotifyForOperationDelete = true;
pushTopic.NotifyForFields = 'Referenced';
insert pushTopic;

This example creates a PushTopic named “ContactUpdates” and will produce an event every time a Contact is created, updated, undeleted, or deleted.  The Id and Name fields of the Contact record will be sent in the event’s payload.

To consume events from a PushTopic you need a Comet client.  For this example app the Java client from cometd.org will be used to consume events in a Scala application.  A very basic event consumer would look like:

val bayeuxClient = new BayeuxClient(url, transport)
bayeuxClient.handshake()
bayeuxClient.waitFor(connectionTimeout, BayeuxClient.State.CONNECTED)
bayeuxClient.getChannel(s"/topic/ContactUpdates").subscribe {
  new MessageListener() {
    override def onMessage(channel: ClientSessionChannel, message: Message) {
      println(message)
    }
  }
}

Some additional setup has been omitted here but can be found in the source for this example.

Sending Salesforce Streaming Events to Kafka

Now that you’ve seen how to setup a PushTopic and consumer with the Salesforce Streaming API, we can take those events and send them to Kafka.  This provides the fault-tolerance and scalability inherent in Kafka.  For this example Reactive Kafka with Scala will be used to hook the Salesforce events to Kafka.  Reactive Kafka uses Akka Streams, a Java and Scala implementation of Reactive Streams based on Akka.

Akka Streams uses the Source and Sink event paradigm where a Source produces events and a Sink consumes them.

In our example the Source is the Salesforce Streaming API and the Sink is Kafka.  To create a Source we use an Akka Actor which will receive messages whenever the Salesforce Streaming API sends them.  Based on the example of the PushTopic earlier here is a Akka Streams Source that is connected to the Salesforce Streaming API:

val bayeuxClient = new BayeuxClient(url, transport)
val actorRef = actorSystem.actorOf(Props(new ChannelActor))
val actorPublisher = ActorPublisher(actorRef)
val source = Source.fromPublisher(actorPublisher)

bayeuxClient.handshake()
bayeuxClient.waitFor(connectionTimeout, BayeuxClient.State.CONNECTED)
bayeuxClient.getChannel(s"/topic/ContactUpdates").subscribe {
  new MessageListener() {
    override def onMessage(channel: ClientSessionChannel, message: Message) {
      actorRef ! message
    }
  }
}

The onMessage function now sends a message (that is what the ! operator does) to the Akka Actor that the Source is connected to.  Check out the full source for this example to see the omitted setup.

Now that we have a Source that produces events whenever the Streaming API sends them, we need a Sink that will consume the events and send them to Kafka.  The Reactive Kafka library makes this pretty straightforward:

val sink = Producer.plainSink(
  ProducerSettings[String, String](actorSystem, serializer, serializer)
    .withBootstrapServers(kafkaUrls)
)

Check out the full source for this example to see the omitted setup and error handling.

The final step is to wire together the Salesforce Source with the Kafka Sink.

def messageToProducerRecord(message: Message) = {
  logger.debug("Got message: " + message.getJSON)
  new ProducerRecord[String, String]("ContactUpdates", message.getJSON)
}

salesforceSource.map(messageToProducerRecord).to(kafkaSink).run()

The map(messageToProducerRecord) transforms the incoming Message from Salesforce into an outgoing ProducerRecord for Kafka.  Check out the full source for this example.

Running the Example App and Further Learning

You can get this full example running on Heroku with Heroku Kafka by following the instructions in the project’s README.  There are also instructions for running the code locally.

To learn more about Heroku Kafka, check out the Dev Center docs.  Let us know how it goes!

Leave your comments...

Streaming Salesforce Events to Heroku Kafka