package io.reactivex.mantis.network.push;

import io.mantisrx.common.MantisGroup;
import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.reactivx.mantis.operators.DisableBackPressureOperator;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.schedulers.Schedulers;

/* loaded from: input_file:io/reactivex/mantis/network/push/ObservableTrigger.class */
public final class ObservableTrigger {
    private static final Logger logger = LoggerFactory.getLogger(ObservableTrigger.class);
    private static Scheduler timeoutScheduler = Schedulers.from(Executors.newFixedThreadPool(5));

    private ObservableTrigger() {
    }

    private static <T> PushTrigger<T> trigger(String str, Observable<T> observable, Action0 action0, Action1<Throwable> action1) {
        AtomicReference atomicReference = new AtomicReference();
        Metrics build = new Metrics.Builder().name("ObservableTrigger_" + str).addGauge("subscriptionActive").build();
        Gauge gauge = build.getGauge("subscriptionActive");
        return new PushTrigger<>(monitoredQueue -> {
            Subscription subscription = (Subscription) atomicReference.getAndSet(observable.filter(obj -> {
                return Boolean.valueOf(obj != null);
            }).doOnSubscribe(() -> {
                logger.info("Subscription is ACTIVE for observable trigger with name: " + str);
                gauge.increment();
            }).doOnUnsubscribe(() -> {
                logger.info("Subscription is INACTIVE for observable trigger with name: " + str);
                gauge.decrement();
            }).subscribe(obj2 -> {
                monitoredQueue.write(obj2);
            }, th -> {
                logger.warn("Observable used to push data errored, on server with name: " + str, th);
                if (action1 != null) {
                    action1.call(th);
                }
            }, () -> {
                logger.info("Observable used to push data completed, on server with name: " + str);
                if (action0 != null) {
                    action0.call();
                }
            }));
            if (subscription != null) {
                logger.info("A new subscription is ACTIVE. Unsubscribe from previous subscription observable trigger with name: " + str);
                subscription.unsubscribe();
            }
        }, monitoredQueue2 -> {
            if (atomicReference.get() != null) {
                logger.warn("Connections from next stage has dropped to 0. Do not propagate unsubscribe");
            }
        }, build);
    }

    private static <T> PushTrigger<T> ssetrigger(String str, Observable<T> observable, Action0 action0, Action1<Throwable> action1) {
        AtomicReference atomicReference = new AtomicReference();
        Metrics build = new Metrics.Builder().name("ObservableTrigger_" + str).addGauge("subscriptionActive").build();
        Gauge gauge = build.getGauge("subscriptionActive");
        return new PushTrigger<>(monitoredQueue -> {
            atomicReference.set(observable.filter(obj -> {
                return Boolean.valueOf(obj != null);
            }).doOnSubscribe(() -> {
                logger.info("Subscription is ACTIVE for observable trigger with name: " + str);
                gauge.increment();
            }).doOnUnsubscribe(() -> {
                logger.info("Subscription is INACTIVE for observable trigger with name: " + str);
                gauge.decrement();
            }).subscribe(obj2 -> {
                monitoredQueue.write(obj2);
            }, th -> {
                logger.warn("Observable used to push data errored, on server with name: " + str, th);
                if (action1 != null) {
                    action1.call(th);
                }
            }, () -> {
                logger.info("Observable used to push data completed, on server with name: " + str);
                if (action0 != null) {
                    action0.call();
                }
            }));
        }, monitoredQueue2 -> {
            if (atomicReference.get() != null) {
                logger.warn("Connections from next stage has dropped to 0 for SSE stage. propagate unsubscribe");
                ((Subscription) atomicReference.get()).unsubscribe();
            }
        }, build);
    }

    private static <K, V> PushTrigger<KeyValuePair<K, V>> groupTrigger(String str, Observable<GroupedObservable<K, V>> observable, Action0 action0, Action1<Throwable> action1, long j, Func1<K, byte[]> func1, HashFunction hashFunction) {
        AtomicReference atomicReference = new AtomicReference();
        Metrics build = new Metrics.Builder().name("ObservableTrigger_" + str).addGauge("subscriptionActive").build();
        Gauge gauge = build.getGauge("subscriptionActive");
        return new PushTrigger<>(monitoredQueue -> {
            Subscription subscription = (Subscription) atomicReference.getAndSet(observable.doOnSubscribe(() -> {
                logger.info("Subscription is ACTIVE for observable trigger with name: " + str);
                gauge.increment();
            }).doOnUnsubscribe(() -> {
                logger.info("Subscription is INACTIVE for observable trigger with name: " + str);
                gauge.decrement();
            }).flatMap(groupedObservable -> {
                byte[] bArr = (byte[]) func1.call(groupedObservable.getKey());
                long computeHash = hashFunction.computeHash(bArr);
                return groupedObservable.timeout(j, TimeUnit.SECONDS, Observable.empty(), timeoutScheduler).lift(new DisableBackPressureOperator()).buffer(250L, TimeUnit.MILLISECONDS).filter(list -> {
                    return Boolean.valueOf((list == null || list.isEmpty()) ? false : true);
                }).map(list2 -> {
                    ArrayList arrayList = new ArrayList(list2.size());
                    Iterator it = list2.iterator();
                    while (it.hasNext()) {
                        arrayList.add(new KeyValuePair(computeHash, bArr, it.next()));
                    }
                    return arrayList;
                });
            }).subscribe(list -> {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    monitoredQueue.write((KeyValuePair) it.next());
                }
            }, th -> {
                logger.warn("Observable used to push data errored, on server with name: " + str, th);
                if (action1 != null) {
                    action1.call(th);
                }
            }, () -> {
                logger.info("Observable used to push data completed, on server with name: " + str);
                if (action0 != null) {
                    action0.call();
                }
            }));
            if (subscription != null) {
                logger.info("A new subscription is ACTIVE. Unsubscribe from previous subscription observable trigger with name: " + str);
                subscription.unsubscribe();
            }
        }, monitoredQueue2 -> {
            if (atomicReference.get() != null) {
                logger.warn("Connections from next stage has dropped to 0. Do not propagate unsubscribe until a new connection is made.");
            }
        }, build);
    }

    private static <K, V> PushTrigger<KeyValuePair<K, V>> mantisGroupTrigger(String str, Observable<MantisGroup<K, V>> observable, Action0 action0, Action1<Throwable> action1, long j, Func1<K, byte[]> func1, HashFunction hashFunction) {
        AtomicReference atomicReference = new AtomicReference();
        Metrics build = new Metrics.Builder().name("ObservableTrigger_" + str).addGauge("subscriptionActive").build();
        Gauge gauge = build.getGauge("subscriptionActive");
        return new PushTrigger<>(monitoredQueue -> {
            Subscription subscription = (Subscription) atomicReference.getAndSet(observable.doOnSubscribe(() -> {
                logger.info("Subscription is ACTIVE for observable trigger with name: " + str);
                gauge.increment();
            }).doOnUnsubscribe(() -> {
                logger.info("Subscription is INACTIVE for observable trigger with name: " + str);
                gauge.decrement();
            }).map(mantisGroup -> {
                byte[] bArr = (byte[]) func1.call(mantisGroup.getKeyValue());
                return new KeyValuePair(hashFunction.computeHash(bArr), bArr, mantisGroup.getValue());
            }).subscribe(keyValuePair -> {
                monitoredQueue.write(keyValuePair);
            }, th -> {
                logger.warn("Observable used to push data errored, on server with name: " + str, th);
                if (action1 != null) {
                    action1.call(th);
                }
            }, () -> {
                logger.info("Observable used to push data completed, on server with name: " + str);
                if (action0 != null) {
                    action0.call();
                }
            }));
            if (subscription != null) {
                logger.info("A new subscription is ACTIVE. Unsubscribe from previous subscription observable trigger with name: " + str);
                subscription.unsubscribe();
            }
        }, monitoredQueue2 -> {
            if (atomicReference.get() != null) {
                logger.warn("Connections from next stage has dropped to 0. Do not propagate unsubscribe until a new connection is made.");
            }
        }, build);
    }

    public static <T> PushTrigger<T> o(String str, Observable<T> observable, Action0 action0, Action1<Throwable> action1) {
        return ssetrigger(str, observable, action0, action1);
    }

    public static <T> PushTrigger<T> oo(String str, Observable<Observable<T>> observable, Action0 action0, Action1<Throwable> action1) {
        return trigger(str, Observable.merge(observable), action0, action1);
    }

    public static <K, V> PushTrigger<KeyValuePair<K, V>> oogo(String str, Observable<Observable<GroupedObservable<K, V>>> observable, Action0 action0, Action1<Throwable> action1, long j, Func1<K, byte[]> func1, HashFunction hashFunction) {
        return groupTrigger(str, Observable.merge(observable), action0, action1, j, func1, hashFunction);
    }

    public static <K, V> PushTrigger<KeyValuePair<K, V>> oomgo(String str, Observable<Observable<MantisGroup<K, V>>> observable, Action0 action0, Action1<Throwable> action1, long j, Func1<K, byte[]> func1, HashFunction hashFunction) {
        return mantisGroupTrigger(str, Observable.merge(observable), action0, action1, j, func1, hashFunction);
    }
}
