package com.azure.cosmos.test.implementation.faultinjection;

import com.azure.cosmos.implementation.CosmosSchedulers;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint;
import com.azure.cosmos.implementation.faultinjection.RntbdFaultInjectionConnectionCloseEvent;
import com.azure.cosmos.implementation.faultinjection.RntbdFaultInjectionConnectionResetEvent;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionType;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/cosmos/test/implementation/faultinjection/RntbdConnectionErrorInjector.class */
public class RntbdConnectionErrorInjector {
    private static final Logger LOGGER = LoggerFactory.getLogger(RntbdConnectionErrorInjector.class);
    private final RntbdEndpoint.Provider endpointProvider;
    private final FaultInjectionRuleStore ruleStore;

    public RntbdConnectionErrorInjector(RntbdEndpoint.Provider provider, FaultInjectionRuleStore faultInjectionRuleStore) {
        Preconditions.checkNotNull(provider, "Argument 'endpointProvider' can not be null");
        Preconditions.checkNotNull(faultInjectionRuleStore, "Argument 'ruleStore' can not be null");
        this.endpointProvider = provider;
        this.ruleStore = faultInjectionRuleStore;
    }

    public boolean accept(IFaultInjectionRuleInternal iFaultInjectionRuleInternal) {
        if (iFaultInjectionRuleInternal.getConnectionType() != FaultInjectionConnectionType.DIRECT || !(iFaultInjectionRuleInternal instanceof FaultInjectionConnectionErrorRule)) {
            return false;
        }
        CosmosSchedulers.FAULT_INJECTION_CONNECTION_ERROR_BOUNDED_ELASTIC.schedule(() -> {
            injectConnectionErrorTask((FaultInjectionConnectionErrorRule) iFaultInjectionRuleInternal).subscribe();
        });
        return true;
    }

    public Mono<Void> injectConnectionErrorTask(FaultInjectionConnectionErrorRule faultInjectionConnectionErrorRule) {
        return Mono.delay(faultInjectionConnectionErrorRule.getResult().getInterval()).flatMapMany(l -> {
            if (!isEffectiveRule(faultInjectionConnectionErrorRule)) {
                return Mono.empty();
            }
            faultInjectionConnectionErrorRule.applyRule();
            return (faultInjectionConnectionErrorRule.getAddresses() == null || faultInjectionConnectionErrorRule.getAddresses().size() <= 0) ? (faultInjectionConnectionErrorRule.getRegionEndpoints() == null || faultInjectionConnectionErrorRule.getRegionEndpoints().size() <= 0) ? Flux.fromIterable((Iterable) this.endpointProvider.list().collect(Collectors.toList())).flatMap(rntbdEndpoint -> {
                rntbdEndpoint.injectConnectionErrors(faultInjectionConnectionErrorRule.getId(), faultInjectionConnectionErrorRule.getResult().getThreshold(), getCloseEventType(faultInjectionConnectionErrorRule));
                return Mono.empty();
            }) : Flux.fromIterable(faultInjectionConnectionErrorRule.getRegionEndpoints()).flatMap(uri -> {
                return Flux.fromIterable((Iterable) this.endpointProvider.list().filter(rntbdEndpoint2 -> {
                    return uri.equals(rntbdEndpoint2.serviceEndpoint());
                }).collect(Collectors.toList())).flatMap(rntbdEndpoint3 -> {
                    rntbdEndpoint3.injectConnectionErrors(faultInjectionConnectionErrorRule.getId(), faultInjectionConnectionErrorRule.getResult().getThreshold(), getCloseEventType(faultInjectionConnectionErrorRule));
                    return Mono.empty();
                });
            }) : Flux.fromIterable(faultInjectionConnectionErrorRule.getAddresses()).flatMap(uri2 -> {
                RntbdEndpoint rntbdEndpoint2 = this.endpointProvider.get(uri2);
                if (rntbdEndpoint2 != null) {
                    rntbdEndpoint2.injectConnectionErrors(faultInjectionConnectionErrorRule.getId(), faultInjectionConnectionErrorRule.getResult().getThreshold(), getCloseEventType(faultInjectionConnectionErrorRule));
                }
                return Mono.empty();
            });
        }).onErrorResume(th -> {
            LOGGER.warn("Inject connection error for rule [{}] failed due to", faultInjectionConnectionErrorRule.getId(), th);
            return Mono.empty();
        }).repeat(() -> {
            return isEffectiveRule(faultInjectionConnectionErrorRule);
        }).then().doFinally(signalType -> {
            this.ruleStore.removeRule(faultInjectionConnectionErrorRule);
        });
    }

    private Class<?> getCloseEventType(FaultInjectionConnectionErrorRule faultInjectionConnectionErrorRule) {
        switch (faultInjectionConnectionErrorRule.getResult().getErrorType()) {
            case CONNECTION_RESET:
                return RntbdFaultInjectionConnectionResetEvent.class;
            case CONNECTION_CLOSE:
                return RntbdFaultInjectionConnectionCloseEvent.class;
            default:
                throw new IllegalArgumentException("Connection error type " + faultInjectionConnectionErrorRule.getResult().getErrorType() + " is not supported");
        }
    }

    private boolean isEffectiveRule(FaultInjectionConnectionErrorRule faultInjectionConnectionErrorRule) {
        return this.ruleStore.containsRule(faultInjectionConnectionErrorRule) && faultInjectionConnectionErrorRule.isValid();
    }
}
