package org.mule.extension.salesforce.internal.source;

import com.sforce.ws.tools.wsdlc;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.TreeSet;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.common.HashMapMessage;
import org.mule.extension.helpers.logger.ConnectorLogger;
import org.mule.extension.helpers.logger.ConnectorLoggerImpl;
import org.mule.extension.salesforce.api.stream.ReplayOption;
import org.mule.extension.salesforce.internal.connection.ForceWSCConnection;
import org.mule.extension.salesforce.internal.error.SalesforceErrorType;
import org.mule.extension.salesforce.internal.error.exception.service.ExceptionMessages;
import org.mule.extension.salesforce.internal.error.exception.service.SalesforceException;
import org.mule.extension.salesforce.internal.service.streaming.BayeuxParameters;
import org.mule.extension.salesforce.internal.service.streaming.SessionControl;
import org.mule.extension.salesforce.internal.service.streaming.StreamingClient;
import org.mule.extension.salesforce.internal.service.streaming.persistence.StreamingObjectStore;
import org.mule.extension.salesforce.internal.service.streaming.transport.BayeuxClientEventsHolder;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.extension.api.annotation.execution.OnError;
import org.mule.runtime.extension.api.annotation.execution.OnSuccess;
import org.mule.runtime.extension.api.annotation.execution.OnTerminate;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.display.Placement;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.exception.ModuleException;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.mule.runtime.extension.api.runtime.source.SourceResult;

/* loaded from: input_file:repository/com/mulesoft/connectors/mule-salesforce-connector/10.18.2/mule-salesforce-connector-10.18.2-mule-plugin.jar:org/mule/extension/salesforce/internal/source/AbstractStreamingSourceWithReplay.class */
public abstract class AbstractStreamingSourceWithReplay extends AbstractStreamingSource implements SessionControl, ClientSessionChannel.MessageListener {
    private static final ConnectorLogger logger = ConnectorLoggerImpl.newInstance(AbstractStreamingSourceWithReplay.class);
    private static final String STARTING_WITH_REPLAY_ID_MESSAGE = "Starting with replayId: %s.";
    private static final String REPLAY_ID = "replayId";
    private static final String CHANNEL = "channel";
    private StreamingObjectStore objectStore;
    private Long replayIdFromCache;

    @Optional
    @Parameter
    @Summary("When processing flow is slow cache events in memory to reduce quota consumption")
    @Placement(tab = "Advanced", order = 1)
    private boolean cacheEventsInMemory;

    @Override // org.mule.extension.salesforce.internal.source.AbstractStreamingSource
    public void onStart(SourceCallback<Object, Serializable> sourceCallback) throws MuleException {
        logger.debug("Starting the source with replay");
        this.sourceCallback = sourceCallback;
        this.subscribeParams = getSubscribeParams();
        this.connection = (ForceWSCConnection) this.connectionProvider.connect();
        logger.info("Connection has been obtained", "Logging hash and sessionId", () -> {
            return ConnectorLoggerImpl.quickMap(wsdlc.CONNECTION, this.connection, "sessionId", this.connection.getSessionId());
        });
        if (this.subscribeParams.isObjectStoreInteractionRequired()) {
            this.objectStore = this.connection.createStreamingObjectStore(this);
        }
        this.streamingClient = createStreamingClient();
        processExistingMessages();
        super.checkLimits();
        long startReplayId = getStartReplayId();
        logger.info("Starting StreamingClient", "Start replayId from source config = " + startReplayId);
        this.streamingClient.start(startReplayId);
        logger.debug("The source onStart() method has finished", () -> {
            return ConnectorLoggerImpl.quickMap(wsdlc.CONNECTION, this.connection, "source", this);
        });
    }

    @Override // org.mule.extension.salesforce.internal.source.AbstractStreamingSource
    public void onStop() {
        logger.debug("Stopping the source with replay");
        if (this.cacheEventsInMemory) {
            BayeuxClientEventsHolder.getInstance().setInterruptLoopFlag(this.subscribeParams.getTopic(), true);
        }
        super.onStop();
        if (this.subscribeParams.isObjectStoreInteractionRequired()) {
            try {
                java.util.Optional.ofNullable(this.connection).ifPresent(forceWSCConnection -> {
                    forceWSCConnection.unlinkStreamingObjectStore(this);
                });
            } catch (Exception e) {
                logger.error("unlinking the StreamingObjectStore", e, () -> {
                    return ConnectorLoggerImpl.quickMap(wsdlc.CONNECTION, this.connection);
                });
            }
        }
    }

    protected void processExistingMessages() {
        Map<String, Object> dataAsMap;
        if (this.cacheEventsInMemory) {
            BayeuxClientEventsHolder.getInstance().setInterruptLoopFlag(this.subscribeParams.getTopic(), false);
            if (BayeuxClientEventsHolder.getInstance().hasMessages(this.streamingClient.getChannelName())) {
                logger.debug(String.format("Before subscribing process events cached in-memory for channel %s - %s events", this.streamingClient.getChannelName(), Integer.valueOf(BayeuxClientEventsHolder.getInstance().size(this.streamingClient.getChannelName()))));
                while (BayeuxClientEventsHolder.getInstance().hasMessages(this.streamingClient.getChannelName()) && !BayeuxClientEventsHolder.getInstance().isLoopInterrupted(this.streamingClient.getChannelName()).get()) {
                    Message.Mutable popMessage = BayeuxClientEventsHolder.getInstance().popMessage(this.streamingClient.getChannelName());
                    if (getSubscribeParams().getTopic().equals(popMessage.getChannel()) && (dataAsMap = popMessage.getDataAsMap()) != null && dataAsMap.size() != 0 && dataAsMap.containsKey(BayeuxParameters.EVENT) && ((Map) dataAsMap.get(BayeuxParameters.EVENT)).containsKey("replayId")) {
                        this.replayIdFromCache = Long.valueOf(Long.parseLong(((Map) popMessage.getDataAsMap().get(BayeuxParameters.EVENT)).get("replayId").toString()));
                        onMessage(null, popMessage);
                    }
                }
            }
        }
    }

    @OnSuccess
    public synchronized void onSuccess(SourceCallbackContext sourceCallbackContext) {
        logger.debug("Finished the flow successfully");
        if (isPostProcessingEventStorageNeeded(sourceCallbackContext, "(onSuccess)")) {
            try {
                Long l = (Long) sourceCallbackContext.getVariable("replayId").orElseThrow(() -> {
                    return new SalesforceException("ReplyId not found on message");
                });
                String str = (String) sourceCallbackContext.getVariable("channel").orElseThrow(() -> {
                    return new SalesforceException("Topic Name not found on message");
                });
                logger.debug("Processing succeeded", () -> {
                    return ConnectorLoggerImpl.quickMap("replayId", l, "channel", str);
                });
                if (this.objectStore.deleteFailedEventId(str, l.longValue())) {
                    logger.debug("Failed replay Id deleted from list", () -> {
                        return ConnectorLoggerImpl.quickMap("replayId", l, "channel", str);
                    });
                }
                java.util.Optional<Long> highestProcessedEventId = this.objectStore.getHighestProcessedEventId(str);
                if (!highestProcessedEventId.isPresent() || highestProcessedEventId.get().longValue() < l.longValue()) {
                    logger.debug("Adding current processedEventId to the list", () -> {
                        return ConnectorLoggerImpl.quickMap("replayId", l);
                    });
                    this.objectStore.addHighestProcessedEventId(str, l.longValue());
                }
            } catch (NoSuchElementException e) {
                logger.error("retrieving context variables in OnSuccess method", e);
            } catch (Exception e2) {
                logger.error("updating the lowestFailedEventId in the storage layer", e2);
            }
        }
    }

    @OnError
    public synchronized void onError(SourceCallbackContext sourceCallbackContext) {
        logger.debug("Finished the flow with error");
        if (isPostProcessingEventStorageNeeded(sourceCallbackContext, "(onError)")) {
            try {
                Long l = (Long) sourceCallbackContext.getVariable("replayId").orElseThrow(() -> {
                    return new SalesforceException("ReplayId not found on message");
                });
                String str = (String) sourceCallbackContext.getVariable("channel").orElseThrow(() -> {
                    return new SalesforceException("Topic Name not found on message");
                });
                logger.debug("Processing with errors", () -> {
                    return ConnectorLoggerImpl.quickMap("replayId", l, "channel", str);
                });
                this.objectStore.addFailedEventId(str, l.longValue());
                logger.debug("Failed replay Id saved", () -> {
                    return ConnectorLoggerImpl.quickMap("failedEventId", l);
                });
            } catch (NoSuchElementException e) {
                logger.error("retrieving context variables in OnError method.", e);
            } catch (Exception e2) {
                logger.error("updating the lowestFailedEventId in the storage layer", e2);
            }
        }
    }

    private boolean isPostProcessingEventStorageNeeded(SourceCallbackContext sourceCallbackContext, String str) {
        if (!this.subscribeParams.isObjectStoreInteractionRequired()) {
            return false;
        }
        if (sourceCallbackContext.hasVariable("channel") && sourceCallbackContext.hasVariable("replayId")) {
            return true;
        }
        logger.warn(String.format("find context variables %s", str), "context variables might not have been set or have been lost", "the replayId will not be registered in ObjectStore");
        return false;
    }

    @OnTerminate
    public void onTerminate(SourceResult sourceResult) {
        logger.trace(ConnectorLogger.TraceKeywords.ENTERING, "onTerminate method - nothing to execute");
    }

    @Override // org.mule.extension.salesforce.internal.source.AbstractStreamingSource, org.cometd.bayeux.client.ClientSessionChannel.MessageListener
    public void onMessage(ClientSessionChannel clientSessionChannel, Message message) {
        logger.debug("Message received", () -> {
            return ConnectorLoggerImpl.quickMap("message", message);
        });
        Result.Builder builder = Result.builder();
        if (!(message instanceof HashMapMessage)) {
            Result build = builder.output(message.getData()).build();
            logger.debug("Processing a message that's not HashMapMessage", () -> {
                return ConnectorLoggerImpl.quickMap("message", message);
            });
            this.sourceCallback.handle(build);
            return;
        }
        Map<String, Object> dataAsMap = message.getDataAsMap();
        if (dataAsMap == null || dataAsMap.size() == 0 || !dataAsMap.containsKey(BayeuxParameters.EVENT) || !((Map) dataAsMap.get(BayeuxParameters.EVENT)).containsKey("replayId")) {
            logger.warn("send the message for processing", "one or more key fields of the message (data, event or replayId) is missing", "skipping this message", () -> {
                return ConnectorLoggerImpl.quickMap("message", message);
            });
            return;
        }
        Map map = (Map) dataAsMap.get(BayeuxParameters.EVENT);
        Long valueOf = Long.valueOf(Long.parseLong(map.get("replayId").toString()));
        SourceCallbackContext sourceCallbackContext = null;
        if (this.subscribeParams.isObjectStoreInteractionRequired()) {
            sourceCallbackContext = this.sourceCallback.createContext();
            sourceCallbackContext.addVariable("channel", message.getChannel());
            sourceCallbackContext.addVariable("replayId", valueOf);
        }
        HashMap hashMap = new HashMap();
        hashMap.put("channel", message.getChannel());
        for (Map.Entry entry : map.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue());
        }
        builder.output(dataAsMap.getOrDefault("sobject", message)).attributes(hashMap);
        boolean z = true;
        try {
            z = isMessageProcessingNeeded(message.getChannel(), valueOf);
        } catch (Exception e) {
            logger.warn("find out if message processing is needed", "an exception has occurred", "Will send the message to the flow to avoid data loss", e, () -> {
                return ConnectorLoggerImpl.quickMap("replayId", valueOf, "channel", message.getChannel(), "message", message);
            });
        }
        if (!z) {
            logger.debug("Message already processed", () -> {
                return ConnectorLoggerImpl.quickMap("message", message, "replayId", valueOf);
            });
            return;
        }
        logger.debug("Message sent for processing", () -> {
            return ConnectorLoggerImpl.quickMap("message", message, "replayId", valueOf);
        });
        try {
            if (this.subscribeParams.isObjectStoreInteractionRequired()) {
                this.sourceCallback.handle(builder.build(), sourceCallbackContext);
            } else {
                this.sourceCallback.handle(builder.build());
            }
        } catch (Exception e2) {
            logger.error("processing the received message", e2);
            this.sourceCallback.onConnectionException(new ConnectionException(e2.getMessage(), e2.getCause()));
        }
    }

    @Override // org.mule.extension.salesforce.internal.source.AbstractStreamingSource
    protected StreamingClient createStreamingClient() {
        return this.connection.createStreamingClient(this, this.subscribeParams, this, this.componentLocation.getLocation(), this.cacheEventsInMemory);
    }

    @Override // org.mule.extension.salesforce.internal.source.AbstractStreamingSource
    public synchronized long getStartReplayId() {
        if (this.cacheEventsInMemory && this.replayIdFromCache != null) {
            return this.replayIdFromCache.longValue();
        }
        if (!this.subscribeParams.isAutomaticReplay()) {
            return getReplayIdFromReplayOption();
        }
        logger.info("Automatic-Replay is enabled", "Searching for replayId in ObjectStore");
        java.util.Optional<Long> replayIdFromObjectStore = getReplayIdFromObjectStore();
        if (replayIdFromObjectStore.isPresent()) {
            return replayIdFromObjectStore.get().longValue();
        }
        logger.info("No saved replayId found in ObjectStore", "Getting the replayId based on user selection");
        return getReplayIdFromReplayOption();
    }

    private java.util.Optional<Long> getReplayIdFromObjectStore() {
        long longValue;
        String topic = this.subscribeParams.getTopic();
        java.util.Optional<Long> lowestFailedEventId = this.objectStore.getLowestFailedEventId(topic);
        if (lowestFailedEventId.isPresent()) {
            longValue = lowestFailedEventId.get().longValue();
            logger.info("LowestFailedReplayId found in ObjectStore", String.format(STARTING_WITH_REPLAY_ID_MESSAGE, Long.valueOf(longValue)));
        } else {
            java.util.Optional<Long> highestProcessedEventId = this.objectStore.getHighestProcessedEventId(topic);
            if (!highestProcessedEventId.isPresent()) {
                return java.util.Optional.empty();
            }
            longValue = highestProcessedEventId.get().longValue();
            logger.info("Last known processed replayId found in ObjectStore", String.format(STARTING_WITH_REPLAY_ID_MESSAGE, Long.valueOf(longValue)));
        }
        return java.util.Optional.of(Long.valueOf(longValue));
    }

    private java.util.Optional<Long> getStartReplayIdForResumeLatestOnly() {
        java.util.Optional<Long> optional;
        java.util.Optional<TreeSet<Long>> failedEventIds = this.objectStore.getFailedEventIds(this.subscribeParams.getTopic());
        java.util.Optional<Long> highestProcessedEventId = this.objectStore.getHighestProcessedEventId(this.subscribeParams.getTopic());
        if (failedEventIds.isPresent()) {
            TreeSet<Long> treeSet = failedEventIds.get();
            optional = (java.util.Optional) highestProcessedEventId.map(l -> {
                return java.util.Optional.of(Long.valueOf(Math.max(((Long) treeSet.last()).longValue(), l.longValue())));
            }).orElseGet(() -> {
                return java.util.Optional.ofNullable(treeSet.last());
            });
        } else {
            optional = highestProcessedEventId;
        }
        java.util.Optional<Long> optional2 = optional;
        optional.ifPresent(l2 -> {
            logger.info("Last known received replayId found in ObjectStore", String.format(STARTING_WITH_REPLAY_ID_MESSAGE, optional2.get()));
        });
        if (!optional.isPresent()) {
            logger.info("Last known replayId not found in ObjectStore", "Starting with ReplayOption ALL");
        }
        return optional;
    }

    private long getReplayIdFromReplayOption() {
        logger.debug("Getting replayId from the ReplayOption selected by the user");
        switch (this.subscribeParams.getReplayOption()) {
            case FROM_LAST_REPLAY_ID:
                logger.info("Starting from last known replay id", "Will check ObjectStore");
                return getStartReplayIdForResumeLatestOnly().orElseGet(() -> {
                    return Long.valueOf(Long.parseLong(ReplayOption.ALL.getValue()));
                }).longValue();
            case FROM_REPLAY_ID:
                logger.info("ReplayId has been specified by the user", "Will use what the user entered");
                return getReplayIdEnteredByUser();
            default:
                logger.info("No specific replayId specified", String.format("Selected ReplayOption: %s", this.subscribeParams.getReplayOption()));
                return Long.parseLong(this.subscribeParams.getReplayOption().getValue());
        }
    }

    private long getReplayIdEnteredByUser() {
        String num;
        Object value = this.expressionManager.evaluate(this.subscribeParams.getReplayId()).getValue();
        if (value instanceof String) {
            num = (String) value;
        } else {
            if (!(value instanceof Integer)) {
                throw new ModuleException(ExceptionMessages.REPLAY_ID_MUST_BE_INTEGER, SalesforceErrorType.SOURCE, new SalesforceException(ExceptionMessages.REPLAY_ID_MUST_BE_INTEGER));
            }
            num = Integer.toString(((Integer) value).intValue());
        }
        long longValue = getReplayIdFromString(num).longValue();
        logger.debug(String.format("User specified replayId: %s", Long.valueOf(longValue)));
        return longValue;
    }

    public static Long getReplayIdFromString(String str) {
        try {
            long parseLong = Long.parseLong(str);
            if (parseLong < -2) {
                throw new ModuleException(ExceptionMessages.INVALID_REPLAY_ID, SalesforceErrorType.SOURCE, new SalesforceException(ExceptionMessages.INVALID_REPLAY_ID));
            }
            return Long.valueOf(parseLong);
        } catch (NumberFormatException e) {
            throw new ModuleException(ExceptionMessages.REPLAY_ID_MUST_BE_INTEGER, SalesforceErrorType.SOURCE, e);
        }
    }

    private boolean isMessageProcessingNeeded(String str, Long l) {
        if (!this.subscribeParams.isObjectStoreInteractionRequired()) {
            return true;
        }
        java.util.Optional<Long> highestProcessedEventId = this.objectStore.getHighestProcessedEventId(str);
        if (!highestProcessedEventId.isPresent() || l.longValue() > highestProcessedEventId.get().longValue()) {
            return true;
        }
        java.util.Optional<TreeSet<Long>> failedEventIds = this.objectStore.getFailedEventIds(str);
        return failedEventIds.isPresent() && failedEventIds.get().contains(l);
    }
}
