Retry Long-Lived RPC Calls After an Error Occurs

When an error occurs, a long-lived RPC call, including PublishStream, Subscribe, and ManagedSubscribe (beta), stops running and the Pub/Sub API service closes the streaming connection. To create a new streaming connection, you must retry the RPC call. How you retry the long-lived RPC call depends on the error received in the gRPC StatusRuntimeException, which you can catch in your client.

Because errors are sometimes transient and can go away after a period of time, we recommend that you limit the number of retry attempts and that you retry the RPC calls with exponential backoff. Increasing the time between calls ensures that you don’t keep receiving the same error and exhaust all your retry attempts.

To inspect a Pub/Sub API error code value, get the gRPC StatusRuntimeException Trailers section of the exception by calling getTrailers() on the exception. The Pub/Sub API error code provides information about the cause of the failure. For a list of error codes, see Error Codes. For example, the error code in this example is sfdc.platform.eventbus.grpc.subscription.fetch.replayid.corrupted. It means that the specified Replay ID value is invalid. One possible cause is that it’s an old Replay ID outside the 72-hour retention period.

To retry the Subscribe RPC Method after an error occurs, use a ReplayPreset option to specify from which position in the event bus to resubscribe depending on the error received.

Here are examples of how to retry the Subscribe call based on the error code.

  • If a temporary server error is returned, retry the Subscribe call with the CUSTOM ReplayPreset enum value. Pass in the Replay ID value that you saved from the last processed event to receive events after the one that the client last processed. The subscription resumes from where it left off, after that event with the specified Replay ID.
  • If the Replay ID that you passed in with the CUSTOM ReplayPreset enum value is corrupted, retry the Subscribe call with the LATEST ReplayPreset enum value to receive new events only, or use the EARLIEST option to receive all events that are stored. Because it can slow down performance when a large volume of events is stored in the event bus, use the EARLIEST option sparingly.
  • If you aren’t interested in the events that were missed during client shutdown, retry the Subscribe call by using the LATEST ReplayPreset enum value to get new events.

For more details about the Subscribe call options in the ReplayPreset enum value, see Replaying an Event Stream.

This code snippet shows how to get the error code from the trailers portion of StatusRuntimeException. It then checks for some common error codes and follows a strategy to determine which point in the stream to resubscribe from. This example uses a scheduler to schedule the Subscribe RPC call after a delay, if necessary. The implementation of the scheduler isn’t shown in this example.

The process of retrying the ManagedSubscribe RPC Method (Beta) is simpler than retrying the Subscribe RPC method. If a client retries the ManagedSubscribe call after sending commit requests to store the Replay ID of a processed event on the server, the subscription resumes after the event with the committed Replay ID. The client doesn’t manage the Replay ID’s storage.

Also, the ManagedSubscribe RPC method uses event replay options that you set in the ManagedEventSubscription metadata by using Metadata API or Tooling API. These replay options are used if no Replay ID was committed or the committed Replay ID is invalid.

  • If no Replay ID was committed on the server and you retry the ManagedSubscribe call, the subscription restarts from the value specified in the defaultReplay field of ManagedEventSubscription. If defaultReplay is set to LATEST, the subscription receives new events only. If defaultReplay is set to EARLIEST, the subscription starts from the earliest events stored in the event bus.
  • If the committed Replay ID is invalid and you retry the ManagedSubscribe call, the subscription restarts from the value specified in the errorRecoveryReplay field of ManagedEventSubscription. If errorRecoveryReplay is set to LATEST, the subscription receives new events only. If errorRecoveryReplay is set to EARLIEST, the subscription starts from the earliest events stored in the event bus.

For more information, see ManagedEventSubscription (Beta) in the Metadata API Developer Guide.

The PublishStream RPC Method uses bidirectional streaming. It can send a stream of publish requests while receiving a stream of publish responses from the server. The server returns a PublishResponse for each PublishRequest when publishing is complete for a batch of events. A PublishResponse contains a list of PublishResult objects, each containing the publishing result for one event.

To preserve the order of event batches, wait for the PublishResponse to be received for a published batch of events before sending a new PublishRequest for a new batch. Be aware that this method can slow down the speed of your publishing process.

  1. Publish a batch of events in PublishRequest, and then wait for the PublishResponse to be returned before sending a new PublishRequest.
  2. In PublishResponse, inspect each PublishResult for each event.
  3. If a PublishResult has an error for one or more events, republish the events.
  4. Repeat this process to publish a new batch of events, and then inspect new publish results.

If the order of event batches isn’t important, you can inspect the publish results in each PublishResponse as it’s received. Then republish failed events whose PublishResult contains an error. However, this method can cause one batch to finish publishing before you republish a failed batch of events.

To determine which PublishResult corresponds to which event published, correlate the failed publish result with the event ID (ProducerEvent.Id) by using the correlationKey field that’s returned in PublishResult, which is part of PublishResponse. The correlationKey is set to the value that you set in ProducerEvent.id in PublishRequest when you publish the event. To identify the event published, match PublishResult.correlationKey with ProducerEvent.id (the event ID). You can then republish the event. It’s advisable to limit the number of retries so that your publish calls don’t execute too many times in case the publishing keeps failing.

To view the messages and fields of the pub-sub-api service, see pubsub_api.proto in GitHub. For more information about the correlation key, see Identify an Event with ProducerEvent.id and PublishResponse.correlationKey.

This code snippet shows how to retry publishing failed events. It limits the retries up to a certain number set in the constant MAX_NUM_OF_RETRIES_PER_BATCH value. The example uses a private helper method, validatePublishResponse(), to get a list of PublishResults that have errors (PublishResult.error is true), which represents the failed events. Next, it gets the correlation key from each PublishResult and matches it to a ProducerEvent ID by using a map holding correlation keys and ProducerEvents. Finally, it retries publishing the failed events.

The process of republishing failed events with the unary Publish RPC Method is similar to that of the PublishStream RPC. You can correlate events with a PublishResult by using the correlation key. The only difference between the two RPCs is that Publish RPC has no long-lived connection and the call ends after it executes.