package org.mule.extension.salesforce.internal.service.streaming.replayidtracking;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.cometd.bayeux.Channel;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSession;
import org.mule.extension.helpers.logger.ConnectorLogger;
import org.mule.extension.helpers.logger.ConnectorLoggerImpl;

/* loaded from: input_file:repository/com/mulesoft/connectors/mule-salesforce-connector/10.18.2/mule-salesforce-connector-10.18.2-mule-plugin.jar:org/mule/extension/salesforce/internal/service/streaming/replayidtracking/SalesforceReplayExtension.class */
public class SalesforceReplayExtension implements ClientSession.Extension, ReplayTracker {
    private static final String STARTING_WITH_MESSAGE = "Starting with: ";
    private static final String EXTENSION_NAME = "replay";
    private static final String DATA = "data";
    private static final String EVENT = "event";
    private static final String REPLAY_ID = "replayId";
    private final AtomicBoolean supported = new AtomicBoolean();
    private final String flowName;
    private static final ConnectorLogger logger = ConnectorLoggerImpl.newInstance(SalesforceReplayExtension.class);
    private static final ConcurrentMap<String, Long> dataMap = new ConcurrentHashMap();

    public SalesforceReplayExtension(String str) {
        this.flowName = str;
    }

    @Override // org.cometd.bayeux.client.ClientSession.Extension
    public boolean rcv(ClientSession clientSession, Message.Mutable mutable) {
        if (!this.supported.get()) {
            return true;
        }
        Map<String, Object> mapFromPath = getMapFromPath(mutable, "data", "event");
        Long valueOf = mapFromPath != null ? Long.valueOf(String.valueOf(mapFromPath.get("replayId"))) : null;
        if (valueOf == null) {
            return true;
        }
        try {
            dataMap.put(getReplayIdMapKey(mutable.getChannel()), valueOf);
            return true;
        } catch (ClassCastException e) {
            logger.error("processing replay extension", e, () -> {
                return ConnectorLoggerImpl.quickMap("message", mutable);
            });
            return false;
        }
    }

    @Override // org.cometd.bayeux.client.ClientSession.Extension
    public boolean rcvMeta(ClientSession clientSession, Message.Mutable mutable) {
        if (!Channel.META_HANDSHAKE.equals(mutable.getChannel())) {
            return true;
        }
        Map<String, Object> ext = mutable.getExt(false);
        this.supported.set(ext != null && Boolean.TRUE.equals(ext.get(EXTENSION_NAME)));
        return true;
    }

    @Override // org.cometd.bayeux.client.ClientSession.Extension
    public boolean sendMeta(ClientSession clientSession, Message.Mutable mutable) {
        if (Channel.META_HANDSHAKE.equals(mutable.getChannel())) {
            mutable.getExt(true).put(EXTENSION_NAME, Boolean.TRUE);
            return true;
        }
        if (!Channel.META_SUBSCRIBE.equals(mutable.getChannel()) || !this.supported.get()) {
            return true;
        }
        String removeFilterFromTopicName = removeFilterFromTopicName((String) mutable.get(Message.SUBSCRIPTION_FIELD));
        Long l = dataMap.get(getReplayIdMapKey(removeFilterFromTopicName));
        if (l == null) {
            return true;
        }
        HashMap hashMap = new HashMap();
        hashMap.put(removeFilterFromTopicName, l);
        mutable.getExt(true).put(EXTENSION_NAME, hashMap);
        return true;
    }

    @Override // org.mule.extension.salesforce.internal.service.streaming.replayidtracking.ReplayTracker
    public void setStartReplayId(String str, long j) {
        Long putIfAbsent = dataMap.putIfAbsent(getReplayIdMapKey(str), Long.valueOf(j));
        if (putIfAbsent == null) {
            logger.info(String.format("Start replayId for source on flow: \"%s\" and channel: \"%s\" has been set", this.flowName, str), STARTING_WITH_MESSAGE + j);
        } else {
            logger.info(String.format("Last replayId for source on flow: \"%s\" and channel: \"%s\" has been found", this.flowName, str), STARTING_WITH_MESSAGE + putIfAbsent);
        }
    }

    @Override // org.mule.extension.salesforce.internal.service.streaming.replayidtracking.ReplayTracker
    public void setReplayIdWithOverwrite(String str, long j) {
        dataMap.put(getReplayIdMapKey(str), Long.valueOf(j));
        logger.info(String.format("Start replayId for source on flow: \"%s\" and channel: \"%s\" has been overwritten", this.flowName, str), STARTING_WITH_MESSAGE + j);
    }

    @Override // org.mule.extension.salesforce.internal.service.streaming.replayidtracking.ReplayTracker
    public Optional<Long> getReplayId(String str) {
        return Optional.ofNullable(dataMap.get(getReplayIdMapKey(str)));
    }

    private static String removeFilterFromTopicName(String str) {
        return str.contains("?") ? str.substring(0, str.indexOf(63)) : str;
    }

    private static Map<String, Object> getMapFromPath(Map<String, Object> map, String... strArr) {
        for (String str : strArr) {
            map = map.get(str) != null ? (Map) map.get(str) : null;
            if (map == null) {
                break;
            }
        }
        return map;
    }

    private String getReplayIdMapKey(String str) {
        return String.format("%s_%s", this.flowName.replace("_", ""), removeFilterFromTopicName(str));
    }
}
