package org.apache.flume.sink;

import com.google.common.base.Preconditions;
import java.util.Iterator;
import java.util.List;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Sink;
import org.apache.flume.conf.Configurable;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.util.OrderSelector;
import org.apache.flume.util.RandomOrderSelector;
import org.apache.flume.util.RoundRobinOrderSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:META-INF/bundled-dependencies/flume-ng-core-1.11.0.jar:org/apache/flume/sink/LoadBalancingSinkProcessor.class */
public class LoadBalancingSinkProcessor extends AbstractSinkProcessor {
    public static final String CONFIG_SELECTOR = "selector";
    public static final String CONFIG_SELECTOR_PREFIX = "selector.";
    public static final String CONFIG_BACKOFF = "backoff";
    public static final String SELECTOR_NAME_ROUND_ROBIN = "ROUND_ROBIN";
    public static final String SELECTOR_NAME_RANDOM = "RANDOM";
    public static final String SELECTOR_NAME_ROUND_ROBIN_BACKOFF = "ROUND_ROBIN_BACKOFF";
    public static final String SELECTOR_NAME_RANDOM_BACKOFF = "RANDOM_BACKOFF";
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) LoadBalancingSinkProcessor.class);
    private SinkSelector selector;

    /* loaded from: input_file:META-INF/bundled-dependencies/flume-ng-core-1.11.0.jar:org/apache/flume/sink/LoadBalancingSinkProcessor$RandomOrderSinkSelector.class */
    private static class RandomOrderSinkSelector extends AbstractSinkSelector {
        private OrderSelector<Sink> selector;

        RandomOrderSinkSelector(boolean z) {
            this.selector = new RandomOrderSelector(z);
        }

        @Override // org.apache.flume.sink.AbstractSinkSelector, org.apache.flume.conf.Configurable
        public void configure(Context context) {
            super.configure(context);
            if (this.maxTimeOut != 0) {
                this.selector.setMaxTimeOut(this.maxTimeOut);
            }
        }

        @Override // org.apache.flume.sink.AbstractSinkSelector, org.apache.flume.sink.LoadBalancingSinkProcessor.SinkSelector
        public void setSinks(List<Sink> list) {
            this.selector.setObjects(list);
        }

        @Override // org.apache.flume.sink.LoadBalancingSinkProcessor.SinkSelector
        public Iterator<Sink> createSinkIterator() {
            return this.selector.createIterator();
        }

        @Override // org.apache.flume.sink.AbstractSinkSelector, org.apache.flume.sink.LoadBalancingSinkProcessor.SinkSelector
        public void informSinkFailed(Sink sink) {
            this.selector.informFailure(sink);
        }
    }

    /* loaded from: input_file:META-INF/bundled-dependencies/flume-ng-core-1.11.0.jar:org/apache/flume/sink/LoadBalancingSinkProcessor$RoundRobinSinkSelector.class */
    private static class RoundRobinSinkSelector extends AbstractSinkSelector {
        private OrderSelector<Sink> selector;

        RoundRobinSinkSelector(boolean z) {
            this.selector = new RoundRobinOrderSelector(z);
        }

        @Override // org.apache.flume.sink.AbstractSinkSelector, org.apache.flume.conf.Configurable
        public void configure(Context context) {
            super.configure(context);
            if (this.maxTimeOut != 0) {
                this.selector.setMaxTimeOut(this.maxTimeOut);
            }
        }

        @Override // org.apache.flume.sink.LoadBalancingSinkProcessor.SinkSelector
        public Iterator<Sink> createSinkIterator() {
            return this.selector.createIterator();
        }

        @Override // org.apache.flume.sink.AbstractSinkSelector, org.apache.flume.sink.LoadBalancingSinkProcessor.SinkSelector
        public void setSinks(List<Sink> list) {
            this.selector.setObjects(list);
        }

        @Override // org.apache.flume.sink.AbstractSinkSelector, org.apache.flume.sink.LoadBalancingSinkProcessor.SinkSelector
        public void informSinkFailed(Sink sink) {
            this.selector.informFailure(sink);
        }
    }

    /* loaded from: input_file:META-INF/bundled-dependencies/flume-ng-core-1.11.0.jar:org/apache/flume/sink/LoadBalancingSinkProcessor$SinkSelector.class */
    public interface SinkSelector extends Configurable, LifecycleAware {
        void setSinks(List<Sink> list);

        Iterator<Sink> createSinkIterator();

        void informSinkFailed(Sink sink);
    }

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        Preconditions.checkState(getSinks().size() > 1, "The LoadBalancingSinkProcessor cannot be used for a single sink. Please configure more than one sinks and try again.");
        String string = context.getString(CONFIG_SELECTOR, "ROUND_ROBIN");
        Boolean bool = context.getBoolean("backoff", false);
        this.selector = null;
        if (string.equalsIgnoreCase("ROUND_ROBIN")) {
            this.selector = new RoundRobinSinkSelector(bool.booleanValue());
        } else if (string.equalsIgnoreCase("RANDOM")) {
            this.selector = new RandomOrderSinkSelector(bool.booleanValue());
        } else {
            try {
                this.selector = (SinkSelector) Class.forName(string).newInstance();
            } catch (Exception e) {
                throw new FlumeException("Unable to instantiate sink selector: " + string, e);
            }
        }
        this.selector.setSinks(getSinks());
        this.selector.configure(new Context(context.getSubProperties("selector.")));
        LOGGER.debug("Sink selector: " + this.selector + " initialized");
    }

    @Override // org.apache.flume.sink.AbstractSinkProcessor, org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        super.start();
        this.selector.start();
    }

    @Override // org.apache.flume.sink.AbstractSinkProcessor, org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        super.stop();
        this.selector.stop();
    }

    @Override // org.apache.flume.SinkProcessor
    public Sink.Status process() throws EventDeliveryException {
        Sink.Status status = null;
        Iterator<Sink> createSinkIterator = this.selector.createSinkIterator();
        while (createSinkIterator.hasNext()) {
            Sink next = createSinkIterator.next();
            try {
                status = next.process();
                break;
            } catch (Exception e) {
                this.selector.informSinkFailed(next);
                LOGGER.warn("Sink failed to consume event. Attempting next sink if available.", (Throwable) e);
            }
        }
        if (status == null) {
            throw new EventDeliveryException("All configured sinks have failed");
        }
        return status;
    }
}
