package org.mule.extension.salesforce.internal.service.streaming;

import com.sforce.ws.wsdl.Constants;
import java.util.HashMap;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.mule.extension.helpers.logger.ConnectorLogger;
import org.mule.extension.helpers.logger.ConnectorLoggerImpl;
import org.mule.extension.salesforce.internal.error.SalesforceErrorType;
import org.mule.extension.salesforce.internal.error.exception.service.ExceptionMessages;
import org.mule.extension.salesforce.internal.logger.ConnectorLoggerMessages;
import org.mule.extension.salesforce.internal.service.streaming.listeners.MessageListenerInfo;
import org.mule.extension.salesforce.internal.service.streaming.listeners.SubscribeListener;
import org.mule.extension.salesforce.internal.service.streaming.replayidtracking.SalesforceReplayExtension;
import org.mule.extension.salesforce.internal.service.streaming.transport.BayeuxClientEventsHolder;
import org.mule.extension.salesforce.internal.service.streaming.transport.BayeuxClientFactory;
import org.mule.extension.salesforce.internal.service.streaming.transport.CustomHttpTransport;
import org.mule.extension.salesforce.internal.service.streaming.transport.HttpClientAdapter;
import org.mule.extension.salesforce.internal.source.SubscribeParams;
import org.mule.runtime.api.util.Pair;
import org.mule.runtime.extension.api.exception.ModuleException;

/* loaded from: input_file:repository/com/mulesoft/connectors/mule-salesforce-connector/10.18.2/mule-salesforce-connector-10.18.2-mule-plugin.jar:org/mule/extension/salesforce/internal/service/streaming/StreamingClientImpl.class */
public class StreamingClientImpl implements StreamingClient {
    private static final String CLIENT_MESSAGE = "client";
    private static final long CONNECTING_TIMEOUT = 5000;
    private static final long DISCONNECTING_TIMEOUT = 1000;
    private final BayeuxParameters bayeuxParameters;
    private final SessionControl sessionControl;
    private BayeuxClient client;
    private final HttpClientAdapter httpClientAdapter;
    private final SubscribeParams subscribeParams;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final Set<MessageListenerInfo> listenerInfos = new CopyOnWriteArraySet();
    private final Set<Pair<String, Function<StreamingClient, ClientSessionChannel.MessageListener>>> listenerInfoSuppliers;
    private final ClientSessionChannel.MessageListener topicListener;
    private final SalesforceReplayExtension replayExtension;
    private final BayeuxClientFactory bayeuxClientFactory;
    private final boolean cacheEventsInMemory;
    private static final ConnectorLogger logger = ConnectorLoggerImpl.newInstance(StreamingClientImpl.class);
    private static final Lock BAYEUX_CLIENT_CREATION_LOCK = new ReentrantLock();

    public StreamingClientImpl(HttpClientAdapter httpClientAdapter, BayeuxParameters bayeuxParameters, SessionControl sessionControl, SubscribeParams subscribeParams, Set<Pair<String, Function<StreamingClient, ClientSessionChannel.MessageListener>>> set, ClientSessionChannel.MessageListener messageListener, SalesforceReplayExtension salesforceReplayExtension, BayeuxClientFactory bayeuxClientFactory, boolean z) {
        this.bayeuxParameters = bayeuxParameters;
        this.sessionControl = sessionControl;
        this.subscribeParams = subscribeParams;
        this.listenerInfoSuppliers = set;
        this.httpClientAdapter = httpClientAdapter;
        this.topicListener = messageListener;
        this.replayExtension = salesforceReplayExtension;
        this.bayeuxClientFactory = bayeuxClientFactory;
        this.cacheEventsInMemory = z;
        if (z) {
            BayeuxClientEventsHolder.getInstance().createMessageHolderForStreamingClient(subscribeParams.getTopic());
        }
    }

    @Override // org.mule.extension.salesforce.internal.service.streaming.StreamingClient
    public boolean isCacheEventsInMemory() {
        return this.cacheEventsInMemory;
    }

    @Override // org.mule.extension.salesforce.internal.service.streaming.StreamingClient
    public String getChannelName() {
        return this.subscribeParams.getTopic();
    }

    @Override // org.mule.extension.salesforce.internal.service.streaming.StreamingClient
    public void start(long j) {
        logger.debug("StreamingClient starting", () -> {
            return ConnectorLoggerImpl.quickMap("startReplayId", Long.valueOf(j));
        });
        if (this.running.compareAndSet(false, true)) {
            this.replayExtension.setStartReplayId(this.subscribeParams.getTopic(), j);
            this.listenerInfoSuppliers.forEach(pair -> {
                addListener((String) pair.getFirst(), (ClientSessionChannel.MessageListener) ((Function) pair.getSecond()).apply(this));
            });
            httpStart();
            bayeuxStart(this.sessionControl.getSessionIdFromConnection());
        }
    }

    @Override // org.mule.extension.salesforce.internal.service.streaming.StreamingClient
    public void subscribe(Long l) {
        Optional<Long> replayId = this.replayExtension.getReplayId(this.subscribeParams.getTopic());
        logger.info("Subscribing to channel " + this.subscribeParams.getTopic(), "From replayId " + (l != null ? l : replayId.isPresent() ? replayId.get() : "N/A") + ". Handshaked = " + (this.client != null ? Boolean.valueOf(this.client.isHandshook()) : "N/A") + ", connected = " + (this.client != null ? Boolean.valueOf(this.client.isConnected()) : "N/A") + ", isResubscribe = " + l + ", subscriptions = " + (this.client != null ? this.client.getChannel(this.subscribeParams.getTopic()).getSubscribers() : "N/A"));
        if (!this.running.get()) {
            logger.warn("subscribe", "the streaming client is not in the correct state", ConnectorLoggerMessages.AN_EXCEPTION_WILL_BE_THROWN);
            String format = String.format(ExceptionMessages.CONNECTOR_NOT_STARTED, this.bayeuxParameters.getEndpoint());
            throw new ModuleException(format, SalesforceErrorType.SOURCE, new IllegalStateException(format));
        }
        if (l != null) {
            logger.debug("Re-subscribing from replayId = " + l);
            this.replayExtension.setReplayIdWithOverwrite(this.subscribeParams.getTopic(), l.longValue());
        }
        if (this.client != null) {
            ClientSessionChannel channel = this.client.getChannel(this.subscribeParams.getTopic());
            if (channel.getSubscribers().isEmpty()) {
                channel.subscribe(this.topicListener, new SubscribeListener(this.sessionControl, this));
            } else {
                logger.warn("subscribe to channel", "the client was already subscribed", "Throwing an exception", () -> {
                    return ConnectorLoggerImpl.quickMap("channel", this.subscribeParams.getTopic(), "fromReplayId", replayId);
                });
                String format2 = String.format(ExceptionMessages.ALREADY_SUBSCRIBED_TO_TOPIC, this.subscribeParams.getTopic(), this.bayeuxParameters.getEndpoint());
                throw new ModuleException(format2, SalesforceErrorType.SOURCE, new IllegalStateException(format2));
            }
        }
    }

    @Override // org.mule.extension.salesforce.internal.service.streaming.StreamingClient
    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            bayeuxStop();
            httpStop();
        }
    }

    private void httpStart() {
        try {
            this.httpClientAdapter.start();
        } catch (Exception e) {
            throw new ModuleException(ExceptionMessages.UNABLE_TO_START_HTTP_CLIENT + this.bayeuxParameters.getEndpoint(), SalesforceErrorType.SOURCE, e);
        }
    }

    private void httpStop() {
        if (this.httpClientAdapter != null) {
            try {
                logger.debug("Stopping the http client", () -> {
                    return ConnectorLoggerImpl.quickMap("httpClient", this.httpClientAdapter);
                });
                this.httpClientAdapter.stop();
            } catch (Exception e) {
                logger.debug("Unable to stop the http client. Source will restart and a new http client will be created");
            }
        }
    }

    private void bayeuxStart(String str) {
        logger.debug("Starting the BayeuxClient", () -> {
            return ConnectorLoggerImpl.quickMap("Salesforce instance", this.bayeuxParameters.getEndpoint().toString());
        });
        CustomHttpTransport customHttpTransport = this.httpClientAdapter.getCustomHttpTransport(new HashMap(this.bayeuxParameters.getLongPollingOptions()), this.bayeuxParameters.getUsername(), str, this);
        try {
            BAYEUX_CLIENT_CREATION_LOCK.lock();
            if (this.subscribeParams == null || !this.sessionControl.clientAlreadyExistsInBayeuxClientsMap(this.subscribeParams.getTopic())) {
                logger.debug("Could not find an old BayeuxClient to stop before starting the new one");
            } else {
                BayeuxClient bayeuxClientFromBayeuxClientsMap = this.sessionControl.getBayeuxClientFromBayeuxClientsMap(this.subscribeParams.getTopic());
                logger.debug("Disconnecting previous bayeux client and removing listeners", () -> {
                    return ConnectorLoggerImpl.quickMap("oldClient", bayeuxClientFromBayeuxClientsMap);
                });
                removeListeners(bayeuxClientFromBayeuxClientsMap);
                bayeuxClientFromBayeuxClientsMap.disconnect(1000L);
            }
            this.client = this.bayeuxClientFactory.create(this.bayeuxParameters.getEndpoint().toString(), customHttpTransport);
            logger.debug("Creating a new bayeux client", () -> {
                return ConnectorLoggerImpl.quickMap("newClient", this.client);
            });
            if (this.subscribeParams != null) {
                logger.debug("BayeuxClient added to client map", () -> {
                    return ConnectorLoggerImpl.quickMap("client", this.client, "topic", this.subscribeParams.getTopic());
                });
                this.sessionControl.addNewClientToBayeuxClientsMap(this.subscribeParams.getTopic(), this.client);
            } else {
                logger.debug("The new BayeuxClient has not been added to client map", () -> {
                    return ConnectorLoggerImpl.quickMap("client", this.client);
                });
            }
            BAYEUX_CLIENT_CREATION_LOCK.unlock();
            this.client.addExtension(this.replayExtension);
            addListeners(this.client);
            this.client.handshake();
        } catch (Throwable th) {
            BAYEUX_CLIENT_CREATION_LOCK.unlock();
            throw th;
        }
    }

    private void bayeuxStop() {
        if (this.client != null) {
            logger.debug("Stopping BayeuxClient. Removing client listeners", () -> {
                return ConnectorLoggerImpl.quickMap("client", this.client);
            });
            try {
                this.client.getChannel(this.subscribeParams.getTopic()).removeListener(this.topicListener);
                logger.debug("Message listener removed successfully", () -> {
                    return ConnectorLoggerImpl.quickMap("listener", this.topicListener, "channel", this.subscribeParams.getTopic());
                });
                this.client.removeExtension(this.replayExtension);
                logger.debug("Message extension removed successfully", () -> {
                    return ConnectorLoggerImpl.quickMap("bayeuxClient", this.client, Constants.EXTENSION, this.replayExtension);
                });
            } catch (Exception e) {
                logger.debug("No message listener to remove from channel: " + this.subscribeParams.getTopic());
            }
            removeListeners(this.client);
            if (!this.client.isDisconnected()) {
                logger.debug("Disconnecting BayeuxClient", () -> {
                    return ConnectorLoggerImpl.quickMap("client", this.client);
                });
                this.client.disconnect(1000L);
            }
            this.client = null;
        }
    }

    private void removeListeners(BayeuxClient bayeuxClient) {
        for (MessageListenerInfo messageListenerInfo : this.listenerInfos) {
            bayeuxClient.getChannel(messageListenerInfo.getChannelName()).removeListener(messageListenerInfo.getMessageListener());
        }
    }

    private void addListeners(BayeuxClient bayeuxClient) {
        for (MessageListenerInfo messageListenerInfo : this.listenerInfos) {
            bayeuxClient.getChannel(messageListenerInfo.getChannelName()).addListener(messageListenerInfo.getMessageListener());
        }
    }

    private void addListener(String str, ClientSessionChannel.MessageListener messageListener) {
        this.listenerInfos.add(new MessageListenerInfo(str, messageListener));
    }

    @Override // org.mule.extension.salesforce.internal.service.streaming.StreamingClient
    public SessionControl getSessionControl() {
        return this.sessionControl;
    }

    @Override // org.mule.extension.salesforce.internal.service.streaming.StreamingClient
    public BayeuxClient getClient() {
        return this.client;
    }
}
