Step 4: Write Code That Subscribes to an Event Channel

Follow instructions in this step to build a subscriber client.

When you copy the code snippets in this page, ensure that the indentation starts from the comment block below the with grpc.secure_channel statement that you added in Step 2: Build the Python Client. In Python, code blocks are delimited by their indentation.

  1. Get the topic name that you want to subscribe to.

    The topic format is:

    • For a custom platform event: /event/EventName__e
    • For a standard platform event: /event/EventName
    • For a change data capture channel that captures events for all selected entities: /data/ChangeEvents
    • For a change data capture single-entity channel for a standard object: /data/{StandardObjectName}ChangeEvent
    • For a change data capture single-entity channel for a custom object: /data/{CustomObjectName}__ChangeEvent
    • For a change data capture custom channel: /data/CustomChannelName__chn
  2. Create a generator function to make a FetchRequest stream. After the semaphore is released in the client, the fetchReqStream function acquires a lock and sends a FetchRequest. In this FetchRequest, num_requested is the number of events that the client requests from the server. The specified num_requested of events can be sent in one or more FetchResponses to the client, with each FetchResponse containing a batch of events. In this case, we set it to 1.

  3. Create a decoding function to decode the payloads of received event messages.

  4. Make the subscribe call, and handle received event messages. Decode the payloads of the events with your decoding function. Store the latest replay ID received. You can use the replay ID later to restart a subscription after the last consumed event, if necessary. For more information, see Replaying an Event Stream.

In this example, the event variable represents a FetchResponse. The code example processes one event at a time. When a FetchResponse containing an event is received, the semaphore is released, which results in the fetchReqStream function locking the semaphore and sending a new FetchRequest. The new FetchRequest requests one event in num_requested. This flow control works only when requesting and processing one event at a time. For more information about flow control, see Pull Subscription and Flow Control. For more information about semaphores, see Semaphore Objects in the Python documentation.

If you run your code at this point, you don’t receive any event messages unless you or Salesforce publishes an event message.

For change data capture events, make sure that Change Data Capture is tracking the object by selecting the object in Setup, on the Change Data Capture page. For this example, select the Opportunity object. Then make a change to an opportunity so that Salesforce generates an event message.

Salesforce publishes most standard platform events, including real-time event monitoring events, in response to an action in Salesforce. You can publish only the standard events that support the create() call. For more information, see Standard Platform Event Object List in the Platform Events Developer Guide.

If you subscribe to the /data/OpportunityChangeEvent topic and make a change to an opportunity, you receive a change event similar to this event message. This event message is for an opportunity whose Amount field was changed and the Type field was cleared (set to null).

ChangeEventHeader contains bitmap fields that you must decode before reading their contents. These fields are: nulledFields, changedFields, and diffFields. For more information, see Event Deserialization Considerations.

You can also publish a custom platform event. The next step shows you how to do that using the Pub/Sub API in Python.