Step 4: Write Code That Subscribes to an Event Channel
Follow instructions in this step to build a subscriber client.
When you copy the code snippets in this page, ensure that the indentation starts from the comment block below the with grpc.secure_channel
statement that you added in Step 2: Build the Python Client. In Python, code blocks are delimited by their indentation.
-
Get the topic name that you want to subscribe to.
The topic format is:
- For a custom platform event:
/event/EventName__e
- For a standard platform event:
/event/EventName
- For a change data capture channel that captures events for all selected entities:
/data/ChangeEvents
- For a change data capture single-entity channel for a standard object:
/data/{StandardObjectName}ChangeEvent
- For a change data capture single-entity channel for a custom object:
/data/{CustomObjectName}__ChangeEvent
- For a change data capture custom channel:
/data/CustomChannelName__chn
- For a custom platform event:
-
Create a generator function to make a FetchRequest stream. After the semaphore is released in the client, the
fetchReqStream
function acquires a lock and sends a FetchRequest. In this FetchRequest,num_requested
is the number of events that the client requests from the server. The specifiednum_requested
of events can be sent in one or more FetchResponses to the client, with each FetchResponse containing a batch of events. In this case, we set it to 1. -
Create a decoding function to decode the payloads of received event messages.
-
Make the subscribe call, and handle received event messages. Decode the payloads of the events with your decoding function. Store the latest replay ID received. You can use the replay ID later to restart a subscription after the last consumed event, if necessary. For more information, see Replaying an Event Stream.
In this example, the event
variable represents a FetchResponse. The code example processes one event at a time. When a FetchResponse containing an event is received, the semaphore is released, which results in the fetchReqStream
function locking the semaphore and sending a new FetchRequest. The new FetchRequest requests one event in num_requested
. This flow control works only when requesting and processing one event at a time. For more information about flow control, see Pull Subscription and Flow Control. For more information about semaphores, see Semaphore Objects in the Python documentation.
If you run your code at this point, you don’t receive any event messages unless you or Salesforce publishes an event message.
For change data capture events, make sure that Change Data Capture is tracking the object by selecting the object in Setup, on the Change Data Capture page. For this example, select the Opportunity object. Then make a change to an opportunity so that Salesforce generates an event message.
Salesforce publishes most standard platform events, including real-time event monitoring events, in response to an action in Salesforce. You can publish only the standard events that support the create()
call. For more information, see Standard Platform Event Object List in the Platform Events Developer Guide.
If you subscribe to the /data/OpportunityChangeEvent
topic and make a change to an opportunity, you receive a change event similar to this event message. This event message is for an opportunity whose Amount
field was changed and the Type
field was cleared (set to null).
ChangeEventHeader
contains bitmap fields that you must decode before reading their contents. These fields are: nulledFields
, changedFields
, and diffFields
. For more information, see Event Deserialization Considerations.
You can also publish a custom platform event. The next step shows you how to do that using the Pub/Sub API in Python.