package org.ossgang.commons.observables;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.ossgang.commons.observables.exceptions.UnhandledException;
import org.ossgang.commons.observables.exceptions.UpdateDeliveryException;
import org.ossgang.commons.utils.NamedDaemonThreadFactory;

/* loaded from: input_file:org/ossgang/commons/observables/DispatchingObservable.class */
public class DispatchingObservable<T> implements Observable<T> {
    private static final ExecutorService DISPATCHER_POOL = Executors.newCachedThreadPool(NamedDaemonThreadFactory.daemonThreadFactoryWithPrefix("ossgang-commons-DispatchingObservable-dispatcher-"));
    private final Map<Observer<? super T>, ObservableSubscription<T>> observers = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/ossgang/commons/observables/DispatchingObservable$ObservableSubscription.class */
    public static class ObservableSubscription<T> implements Subscription {
        private final Observer<? super T> listener;
        private final Set<SubscriptionOption> options;
        private final DispatchingObservable<T> observable;

        private ObservableSubscription(DispatchingObservable<T> dispatchingObservable, Observer<? super T> observer, Set<SubscriptionOption> set) {
            this.observable = dispatchingObservable;
            this.listener = observer;
            this.options = set;
        }

        @Override // org.ossgang.commons.observables.Subscription
        public void unsubscribe() {
            this.observable.removeListener(this.listener);
            this.listener.onUnsubscribe(this);
        }
    }

    @Override // org.ossgang.commons.observables.Observable
    public Subscription subscribe(Observer<? super T> observer, SubscriptionOption... subscriptionOptionArr) {
        ObservableSubscription<T> addObserver = addObserver(observer, new HashSet(Arrays.asList(subscriptionOptionArr)));
        observer.onSubscribe(addObserver);
        return addObserver;
    }

    private ObservableSubscription<T> addObserver(Observer<? super T> observer, Set<SubscriptionOption> set) {
        ObservableSubscription<T> observableSubscription = new ObservableSubscription<>(observer, set);
        if (this.observers.put(observer, observableSubscription) == null) {
            subscriptionAdded(observer, set);
        }
        return observableSubscription;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeListener(Observer<? super T> observer) {
        if (this.observers.remove(observer) != null) {
            subscriptionRemoved(observer);
        }
    }

    protected void subscriptionAdded(Observer<? super T> observer, Set<SubscriptionOption> set) {
    }

    protected void subscriptionRemoved(Observer<? super T> observer) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void unsubscribeAllObservers() {
        new HashSet(this.observers.values()).forEach((v0) -> {
            v0.unsubscribe();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void dispatchValue(T t) {
        dispatchValue(t, set -> {
            return true;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void dispatchValue(T t, Predicate<Set<SubscriptionOption>> predicate) {
        this.observers.entrySet().stream().filter(entry -> {
            return predicate.test(((ObservableSubscription) entry.getValue()).options);
        }).map((v0) -> {
            return v0.getKey();
        }).forEach(observer -> {
            Objects.requireNonNull(observer);
            dispatch(observer::onValue, t);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void dispatchException(Throwable th) {
        dispatchException(th, set -> {
            return true;
        });
    }

    protected void dispatchException(Throwable th, Predicate<Set<SubscriptionOption>> predicate) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.observers.entrySet().stream().filter(entry -> {
            return predicate.test(((ObservableSubscription) entry.getValue()).options);
        }).map((v0) -> {
            return v0.getKey();
        }).forEach(observer -> {
            Objects.requireNonNull(observer);
            dispatch(observer::onException, th);
            atomicBoolean.set(true);
        });
        if (atomicBoolean.get()) {
            return;
        }
        ExceptionHandlers.dispatchToUncaughtExceptionHandler(new UnhandledException(th));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <X> Future<?> dispatch(Consumer<X> consumer, X x) {
        return DISPATCHER_POOL.submit(() -> {
            try {
                consumer.accept(x);
            } catch (UnhandledException e) {
                ExceptionHandlers.dispatchToUncaughtExceptionHandler(e);
            } catch (Exception e2) {
                ExceptionHandlers.dispatchToUncaughtExceptionHandler(new UpdateDeliveryException(x, e2));
            }
        });
    }
}
