package org.apache.iotdb.db.subscription.agent;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iotdb.db.subscription.broker.SubscriptionBroker;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import org.apache.iotdb.db.subscription.task.subtask.SubscriptionConnectorSubtask;
import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.class */
public class SubscriptionBrokerAgent {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionBrokerAgent.class);
    private final Map<String, SubscriptionBroker> consumerGroupIdToSubscriptionBroker = new ConcurrentHashMap();

    public List<SubscriptionEvent> poll(ConsumerConfig consumerConfig, Set<String> set, long j) {
        String consumerGroupId = consumerConfig.getConsumerGroupId();
        SubscriptionBroker subscriptionBroker = this.consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
        if (!Objects.isNull(subscriptionBroker)) {
            return subscriptionBroker.poll(consumerConfig.getConsumerId(), set, j);
        }
        String format = String.format("Subscription: broker bound to consumer group [%s] does not exist", consumerGroupId);
        LOGGER.warn(format);
        throw new SubscriptionException(format);
    }

    public List<SubscriptionEvent> pollTsFile(ConsumerConfig consumerConfig, SubscriptionCommitContext subscriptionCommitContext, long j) {
        String consumerGroupId = consumerConfig.getConsumerGroupId();
        SubscriptionBroker subscriptionBroker = this.consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
        if (!Objects.isNull(subscriptionBroker)) {
            return subscriptionBroker.pollTsFile(consumerConfig.getConsumerId(), subscriptionCommitContext, j);
        }
        String format = String.format("Subscription: broker bound to consumer group [%s] does not exist", consumerGroupId);
        LOGGER.warn(format);
        throw new SubscriptionException(format);
    }

    public List<SubscriptionEvent> pollTablets(ConsumerConfig consumerConfig, SubscriptionCommitContext subscriptionCommitContext, int i) {
        String consumerGroupId = consumerConfig.getConsumerGroupId();
        SubscriptionBroker subscriptionBroker = this.consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
        if (!Objects.isNull(subscriptionBroker)) {
            return subscriptionBroker.pollTablets(consumerConfig.getConsumerId(), subscriptionCommitContext, i);
        }
        String format = String.format("Subscription: broker bound to consumer group [%s] does not exist", consumerGroupId);
        LOGGER.warn(format);
        throw new SubscriptionException(format);
    }

    public List<SubscriptionCommitContext> commit(ConsumerConfig consumerConfig, List<SubscriptionCommitContext> list, boolean z) {
        String consumerGroupId = consumerConfig.getConsumerGroupId();
        SubscriptionBroker subscriptionBroker = this.consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
        if (!Objects.isNull(subscriptionBroker)) {
            return subscriptionBroker.commit(consumerConfig.getConsumerId(), list, z);
        }
        String format = String.format("Subscription: broker bound to consumer group [%s] does not exist", consumerGroupId);
        LOGGER.warn(format);
        throw new SubscriptionException(format);
    }

    public boolean isBrokerExist(String str) {
        return this.consumerGroupIdToSubscriptionBroker.containsKey(str);
    }

    public void createBrokerIfNotExist(String str) {
        this.consumerGroupIdToSubscriptionBroker.computeIfAbsent(str, SubscriptionBroker::new);
        LOGGER.info("Subscription: create broker bound to consumer group [{}]", str);
    }

    public boolean dropBroker(String str) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.consumerGroupIdToSubscriptionBroker.compute(str, (str2, subscriptionBroker) -> {
            if (Objects.isNull(subscriptionBroker)) {
                LOGGER.warn("Subscription: broker bound to consumer group [{}] does not exist", str);
                atomicBoolean.set(true);
                return null;
            }
            if (!subscriptionBroker.isEmpty()) {
                LOGGER.warn("Subscription: broker bound to consumer group [{}] is not empty when dropping", str);
                return subscriptionBroker;
            }
            atomicBoolean.set(true);
            LOGGER.info("Subscription: drop broker bound to consumer group [{}]", str);
            return null;
        });
        return atomicBoolean.get();
    }

    public void bindPrefetchingQueue(SubscriptionConnectorSubtask subscriptionConnectorSubtask) {
        String consumerGroupId = subscriptionConnectorSubtask.getConsumerGroupId();
        this.consumerGroupIdToSubscriptionBroker.compute(consumerGroupId, (str, subscriptionBroker) -> {
            if (!Objects.isNull(subscriptionBroker)) {
                return subscriptionBroker;
            }
            LOGGER.info("Subscription: broker bound to consumer group [{}] does not exist, create new for binding prefetching queue", consumerGroupId);
            return new SubscriptionBroker(consumerGroupId);
        }).bindPrefetchingQueue(subscriptionConnectorSubtask.getTopicName(), subscriptionConnectorSubtask.getInputPendingQueue());
    }

    public void unbindPrefetchingQueue(String str, String str2) {
        SubscriptionBroker subscriptionBroker = this.consumerGroupIdToSubscriptionBroker.get(str);
        if (Objects.isNull(subscriptionBroker)) {
            LOGGER.warn("Subscription: broker bound to consumer group [{}] does not exist", str);
        } else {
            subscriptionBroker.unbindPrefetchingQueue(str2);
        }
    }

    public void removePrefetchingQueue(String str, String str2) {
        SubscriptionBroker subscriptionBroker = this.consumerGroupIdToSubscriptionBroker.get(str);
        if (Objects.isNull(subscriptionBroker)) {
            LOGGER.warn("Subscription: broker bound to consumer group [{}] does not exist", str);
        } else {
            subscriptionBroker.removePrefetchingQueue(str2);
        }
    }

    public boolean executePrefetch(String str, String str2) {
        SubscriptionBroker subscriptionBroker = this.consumerGroupIdToSubscriptionBroker.get(str);
        if (!Objects.isNull(subscriptionBroker)) {
            return subscriptionBroker.executePrefetch(str2);
        }
        LOGGER.warn("Subscription: broker bound to consumer group [{}] does not exist", str);
        return false;
    }

    public int getPipeEventCount(String str, String str2) {
        SubscriptionBroker subscriptionBroker = this.consumerGroupIdToSubscriptionBroker.get(str);
        if (!Objects.isNull(subscriptionBroker)) {
            return subscriptionBroker.getPipeEventCount(str2);
        }
        LOGGER.warn("Subscription: broker bound to consumer group [{}] does not exist", str);
        return 0;
    }

    public int getPrefetchingQueueCount() {
        return ((Integer) this.consumerGroupIdToSubscriptionBroker.values().stream().map((v0) -> {
            return v0.getPrefetchingQueueCount();
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        })).intValue();
    }
}
