package com.github.tix320.kiwi.internal.reactive.observable.transform.single;

import com.github.tix320.kiwi.api.check.Try;
import com.github.tix320.kiwi.api.reactive.observable.CompletionType;
import com.github.tix320.kiwi.api.reactive.observable.Observable;
import com.github.tix320.kiwi.api.reactive.observable.Subscriber;
import com.github.tix320.kiwi.api.reactive.observable.Subscription;
import com.github.tix320.kiwi.api.util.None;
import com.github.tix320.kiwi.internal.reactive.observable.TimeoutException;
import com.github.tix320.kiwi.internal.reactive.observable.transform.TransformObservable;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/github/tix320/kiwi/internal/reactive/observable/transform/single/WaitCompleteObservable.class */
public final class WaitCompleteObservable extends TransformObservable<None> {
    private final Observable<?> observable;
    private final Duration timeout;

    public WaitCompleteObservable(Observable<?> observable, Duration duration) {
        this.observable = observable;
        this.timeout = duration;
    }

    @Override // com.github.tix320.kiwi.api.reactive.observable.Observable
    public void subscribe(final Subscriber<? super None> subscriber) {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.observable.subscribe(new Subscriber<Object>() { // from class: com.github.tix320.kiwi.internal.reactive.observable.transform.single.WaitCompleteObservable.1
            @Override // com.github.tix320.kiwi.api.reactive.observable.Subscriber
            public void onSubscribe(Subscription subscription) {
                subscriber.onSubscribe(subscription);
            }

            @Override // com.github.tix320.kiwi.api.reactive.observable.Subscriber
            public boolean onPublish(Object obj) {
                return true;
            }

            @Override // com.github.tix320.kiwi.api.reactive.observable.Subscriber
            public boolean onError(Throwable th) {
                return true;
            }

            @Override // com.github.tix320.kiwi.api.reactive.observable.Subscriber
            public void onComplete(CompletionType completionType) {
                if (completionType == CompletionType.SOURCE_COMPLETED) {
                    subscriber.onPublish(None.SELF);
                }
                countDownLatch.countDown();
                subscriber.onComplete(completionType);
            }
        });
        long millis = this.timeout.toMillis();
        if (millis < 0) {
            Objects.requireNonNull(countDownLatch);
            Try.runOrRethrow(countDownLatch::await);
        } else if (!((Boolean) Try.supplyOrRethrow(() -> {
            return Boolean.valueOf(countDownLatch.await(millis, TimeUnit.MILLISECONDS));
        })).booleanValue()) {
            throw new TimeoutException(String.format("The observable not completed in %sms", Long.valueOf(millis)));
        }
    }
}
