package org.mule.extension.salesforce.internal.source;

import com.sun.xml.bind.v2.runtime.reflect.opt.Const;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.common.HashMapMessage;
import org.mule.extension.helpers.logger.ConnectorLogger;
import org.mule.extension.helpers.logger.ConnectorLoggerImpl;
import org.mule.extension.salesforce.api.param.ReadTimeoutParams;
import org.mule.extension.salesforce.api.stream.ReplayOption;
import org.mule.extension.salesforce.internal.connection.ForceWSCConnection;
import org.mule.extension.salesforce.internal.connection.SalesforceConnection;
import org.mule.extension.salesforce.internal.error.SalesforceErrorType;
import org.mule.extension.salesforce.internal.operation.util.QueryJobPagingDelegate;
import org.mule.extension.salesforce.internal.service.streaming.BayeuxParameters;
import org.mule.extension.salesforce.internal.service.streaming.SessionControl;
import org.mule.extension.salesforce.internal.service.streaming.StreamingClient;
import org.mule.extension.salesforce.internal.service.transport.transformer.JsonInputStreamToMap;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.core.api.el.ExpressionManager;
import org.mule.runtime.extension.api.annotation.param.Connection;
import org.mule.runtime.extension.api.annotation.param.Optional;
import org.mule.runtime.extension.api.annotation.param.Parameter;
import org.mule.runtime.extension.api.annotation.param.display.DisplayName;
import org.mule.runtime.extension.api.annotation.param.display.Placement;
import org.mule.runtime.extension.api.annotation.param.display.Summary;
import org.mule.runtime.extension.api.connectivity.oauth.AccessTokenExpiredException;
import org.mule.runtime.extension.api.exception.ModuleException;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.http.api.HttpConstants;

/* loaded from: input_file:repository/com/mulesoft/connectors/mule-salesforce-connector/10.18.2/mule-salesforce-connector-10.18.2-mule-plugin.jar:org/mule/extension/salesforce/internal/source/AbstractStreamingSource.class */
public abstract class AbstractStreamingSource extends Source<Object, Serializable> implements SessionControl, ClientSessionChannel.MessageListener {
    private static final ConnectorLogger logger = ConnectorLoggerImpl.newInstance(AbstractStreamingSource.class);
    protected static final String MESSAGE = "message";
    protected static final String CONNECTION_MESSAGE = "connection";

    @Optional
    @Parameter
    @Summary("Throw exception when Organization Daily Limit Exceeded occurs")
    @Placement(tab = "Advanced", order = 3)
    @DisplayName("Throw Exception When Organization Daily Limit Exceeded")
    protected boolean throwExceptionWhenOrganizationDailyLimitExceeded;

    @Inject
    protected ExpressionManager expressionManager;
    protected ComponentLocation componentLocation;
    private Map<String, BayeuxClient> bayeuxClientsMap = new ConcurrentHashMap();

    @Connection
    protected ConnectionProvider<SalesforceConnection> connectionProvider;
    protected ForceWSCConnection connection;
    protected SubscribeParams subscribeParams;
    protected SourceCallback<Object, Serializable> sourceCallback;
    protected StreamingClient streamingClient;

    public void onStart(SourceCallback<Object, Serializable> sourceCallback) throws MuleException {
        logger.debug("Source started");
        this.sourceCallback = sourceCallback;
        this.subscribeParams = getSubscribeParams();
        this.connection = (ForceWSCConnection) this.connectionProvider.connect();
        logger.info("Connection has been obtained", "Logging hash and sessionId", () -> {
            return ConnectorLoggerImpl.quickMap("connection", this.connection, "sessionId", this.connection.getSessionId());
        });
        checkLimits();
        long startReplayId = getStartReplayId();
        logger.info("Starting StreamingClient", "Start replayId from source config = " + startReplayId);
        this.streamingClient = createStreamingClient();
        this.streamingClient.start(startReplayId);
    }

    protected StreamingClient createStreamingClient() {
        return this.connection.createStreamingClient(this, this.subscribeParams, this, this.componentLocation.getLocation(), false);
    }

    public void onStop() {
        logger.debug("Source stopped", () -> {
            return ConnectorLoggerImpl.quickMap("source", this);
        });
        java.util.Optional.ofNullable(this.streamingClient).ifPresent((v0) -> {
            v0.stop();
        });
        logger.debug("Disconnecting connection", () -> {
            return ConnectorLoggerImpl.quickMap("connection", this.connection);
        });
        this.connectionProvider.disconnect(this.connection);
    }

    @Override // org.cometd.bayeux.client.ClientSessionChannel.MessageListener
    public void onMessage(ClientSessionChannel clientSessionChannel, Message message) {
        logger.debug(String.format("Message received: %s.", message));
        Result.Builder builder = Result.builder();
        if (!(message instanceof HashMapMessage)) {
            Result build = builder.output(message.getData()).build();
            logger.debug("Processing a message that's not HashMapMessage", () -> {
                return ConnectorLoggerImpl.quickMap("message", message);
            });
            this.sourceCallback.handle(build);
            return;
        }
        Map<String, Object> dataAsMap = message.getDataAsMap();
        if (dataAsMap == null || dataAsMap.size() == 0 || !dataAsMap.containsKey(BayeuxParameters.EVENT) || !((Map) dataAsMap.get(BayeuxParameters.EVENT)).containsKey(BayeuxParameters.REPLAY_ID)) {
            logger.warn("send the message for processing", "one or more key fields of the message (data, event or replayId) is missing", "skipping this message", () -> {
                return ConnectorLoggerImpl.quickMap("message", message);
            });
            return;
        }
        Map map = (Map) dataAsMap.get(BayeuxParameters.EVENT);
        Long valueOf = Long.valueOf(Long.parseLong(map.get(BayeuxParameters.REPLAY_ID).toString()));
        HashMap hashMap = new HashMap();
        hashMap.put("channel", message.getChannel());
        for (Map.Entry entry : map.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue());
        }
        builder.output(dataAsMap.getOrDefault("sobject", message)).attributes(hashMap);
        try {
            logger.debug("Message sent for processing", () -> {
                return ConnectorLoggerImpl.quickMap("message", message, BayeuxParameters.REPLAY_ID, valueOf);
            });
            this.sourceCallback.handle(builder.build());
        } catch (Exception e) {
            logger.error("Failed to process received message", e);
            this.sourceCallback.onConnectionException(new ConnectionException(e.getMessage(), e.getCause()));
        }
    }

    @Override // org.mule.extension.salesforce.internal.service.streaming.SessionControl
    public String getSessionIdFromConnection() {
        return this.connection.getSessionId();
    }

    @Override // org.mule.extension.salesforce.internal.service.streaming.SessionControl
    public void triggerSourceRestart(boolean z, String str) {
        logger.info("The streaming client needs to restart the source", "Notifying the runtime", () -> {
            return ConnectorLoggerImpl.quickMap("isNewConnectionNeeded", Boolean.valueOf(z), "errorMessage", str);
        });
        this.sourceCallback.onConnectionException(z ? new ConnectionException(new AccessTokenExpiredException(this.connection.getResourceOwnerId()), this.connection) : new ConnectionException(str));
    }

    @Override // org.mule.extension.salesforce.internal.service.streaming.SessionControl
    public void addNewClientToBayeuxClientsMap(String str, BayeuxClient bayeuxClient) {
        this.bayeuxClientsMap.put(str, bayeuxClient);
    }

    @Override // org.mule.extension.salesforce.internal.service.streaming.SessionControl
    public boolean clientAlreadyExistsInBayeuxClientsMap(String str) {
        return this.bayeuxClientsMap.containsKey(str);
    }

    @Override // org.mule.extension.salesforce.internal.service.streaming.SessionControl
    public BayeuxClient getBayeuxClientFromBayeuxClientsMap(String str) {
        return this.bayeuxClientsMap.get(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract SubscribeParams getSubscribeParams();

    protected Map<String, String> getDefaultHttpHeaders() {
        HashMap hashMap = new HashMap();
        hashMap.put("Content-Type", "application/json; charset=UTF-8");
        hashMap.put("Accept", "application/json");
        hashMap.put("Authorization", "Bearer " + this.connection.getSessionId());
        return hashMap;
    }

    public long getStartReplayId() {
        return Long.parseLong(ReplayOption.ONLY_NEW.getValue());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkLimits() {
        if (this.throwExceptionWhenOrganizationDailyLimitExceeded) {
            logger.info("This source has the 'Throw Exception When Organization Daily Limit Exceeded' option enabled", "Checking limits");
            Map<String, Object> transform = new JsonInputStreamToMap().transform(this.connection.send(this.connection.getBaseUrl() + QueryJobPagingDelegate.PageIterator.SERVICES_DATA_V + this.connection.getApiVersion() + "/limits", HttpConstants.Method.GET, null, getDefaultHttpHeaders(), null, new ReadTimeoutParams(5, TimeUnit.SECONDS)).getEntity().getContent());
            if ((this.subscribeParams.getTopic().toLowerCase().startsWith("/topic/") && transform.get("DailyDurableStreamingApiEvents") != null && ((Double) ((Map) transform.get("DailyDurableStreamingApiEvents")).get("Remaining")).doubleValue() <= Const.default_value_double) || (((this.subscribeParams.getTopic().toLowerCase().startsWith("/event/") || this.subscribeParams.getTopic().toLowerCase().startsWith("/data/")) && transform.get("DailyDeliveredPlatformEvents") != null && ((Double) ((Map) transform.get("DailyDeliveredPlatformEvents")).get("Remaining")).doubleValue() <= Const.default_value_double) || (this.subscribeParams.getTopic().toLowerCase().startsWith("/u/") && transform.get("DailyDurableGenericStreamingApiEvents") != null && ((Double) ((Map) transform.get("DailyDurableGenericStreamingApiEvents")).get("Remaining")).doubleValue() <= Const.default_value_double))) {
                throw new ModuleException("Organization total events daily limit exceeded", SalesforceErrorType.LIMIT_EXCEEDED);
            }
            logger.info("Limits look good", "Continuing source startup");
        }
    }
}
