Newer Version Available
Process Platform Event Messages in Smaller Batches in Apex Triggers
By processing fewer event messages, your trigger is less likely to hit Apex governor limits. The maximum batch size of a platform event trigger is 2,000, while the maximum of an Apex object trigger is 200. Therefore, platform event triggers are more likely to reach limits and can benefit from this feature.
To set a checkpoint for trigger resumption, set the replay ID of the last successfully processed event message using this method call.
1EventBus.TriggerContext.currentContext().setResumeCheckpoint(replayId);When the trigger stops its flow of execution, either intentionally or because of an unhandled exception, such as a limit exception, it fires again with a new batch (the sObject list in Trigger.New). The new batch starts with the event message after the one with the replay ID that you set. The setResumeCheckpoint(replayId) method doesn’t cause the trigger execution to stop, but you can end the execution explicitly. For example, to control the batch size, end the execution flow after some event messages are processed.
The method throws an EventBus.InvalidReplayIdException if the supplied Replay ID is not valid—the replay ID isn’t in the current trigger batch of events in the Trigger.new list.
Example
This example trigger sets the replay ID of the last processed event message in each iteration. If a limit exception occurs, the trigger is fired again and resumes processing starting with the event message after the one with the set replay ID.
1trigger ResumeEventProcessingTrigger on Low_Ink__e (after insert) {
2 for (Low_Ink__e event : Trigger.New) {
3 // Process the event message.
4 // ...
5
6 // Set the Replay ID of the last successfully processed event message.
7 // If a limit is hit, the trigger refires and processing starts with the
8 // event after the last one processed (the set Replay ID).
9 EventBus.TriggerContext.currentContext().setResumeCheckpoint(event.replayId);
10 }
11}Example
This example controls the platform event trigger batch size and matches it with the 200 batch size of Apex object triggers. The trigger counts the number of event messages processed. The setResumeCheckpoint(replayId) is called in each iteration of the loop after each event message that is successfully processed. The loop is exited if you exceed the count of 200 events, and the trigger stops execution. If you have unprocessed event messages, the trigger fires again. The list of event messages sent to the new trigger invocation starts with the event message after the one with the set replay ID.
1trigger ControlBatchSizeTrigger on Low_Ink__e (after insert) {
2 Integer counter = 0;
3 for (Low_Ink__e event : Trigger.New) {
4 // Increase batch counter.
5 counter++;
6 // Only process the first 200 event messages
7 if (counter > 200) {
8 // Resume after the last successfully processed event message
9 // after the trigger stops running.
10 // Exit for loop.
11 break;
12 }
13
14 // Process event message.
15 // ....
16
17 // Set Replay ID after which to resume event processing
18 // in new trigger execution.
19 EventBus.TriggerContext.currentContext().setResumeCheckpoint(
20 event.ReplayId);
21 }
22}The TestBatchSizeTriggerResumption test class contains a test for the ControlBatchSizeTrigger. The test method in the class publishes 201 event messages. Next, it calls the deliver() method twice to fire the trigger twice. The first invocation processes 200 event messages. The second invocation processes the last event message. The test verifies that the trigger was invoked by inspecting the EventBusSubscriber.Position property, which holds the replay ID of the last processed event message.
1@isTest
2public class TestBatchSizeTriggerResumption {
3
4 @isTest static void testResumingBatchSizeTrigger() {
5
6 Test.startTest();
7
8 // Publish 201 test events
9 List<Low_Ink__e> eventList = new List<Low_Ink__e>();
10 for(Integer i=0;i<201;i++) {
11 Low_Ink__e oneEvent = new Low_Ink__e(Serial_Number__c='X-' + i);
12 eventList.add(oneEvent);
13 }
14 Database.SaveResult[] srs = EventBus.publish(eventList);
15 for(Database.SaveResult sr : srs) {
16 System.assertEquals(true, sr.isSuccess());
17 }
18
19
20 // Deliver the first 200 test event messages.
21 // This will fire the associated event trigger.
22 Test.getEventBus().deliver();
23
24 // Get old position of this subscriber
25 EventBusSubscriber subOld =
26 [SELECT Name, Position, Topic
27 FROM EventBusSubscriber
28 WHERE Topic='Low_Ink__e' AND Name='ControlBatchSizeTrigger'];
29 System.debug(subOld);
30
31 // Refire the trigger for the last event (201st).
32 Test.getEventBus().deliver();
33
34 // VERIFICATION
35 // Get new position of this subscriber
36 EventBusSubscriber subNew =
37 [SELECT Name, Position, Topic
38 FROM EventBusSubscriber
39 WHERE Topic='Low_Ink__e' AND Name='ControlBatchSizeTrigger'];
40 System.debug(subNew);
41
42 System.assertEquals(subOld.Position + 1, subNew.Position);
43
44 Test.stopTest();
45
46 }
47}