package com.github.tix320.kiwi.internal.reactive.publisher;

import com.github.tix320.kiwi.api.reactive.observable.CompletionType;
import com.github.tix320.kiwi.api.reactive.observable.ConditionalConsumer;
import com.github.tix320.kiwi.api.reactive.observable.Observable;
import com.github.tix320.kiwi.api.reactive.observable.Subscriber;
import com.github.tix320.kiwi.api.reactive.observable.Subscription;
import com.github.tix320.kiwi.api.reactive.publisher.Publisher;
import com.github.tix320.kiwi.api.util.IDGenerator;
import com.github.tix320.kiwi.internal.reactive.CompletedException;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

/* loaded from: input_file:com/github/tix320/kiwi/internal/reactive/publisher/BasePublisher.class */
public abstract class BasePublisher<T> implements Publisher<T> {
    private final IDGenerator subscriberIdGenerator = new IDGenerator(1);
    private final AtomicBoolean completed = new AtomicBoolean(false);
    private final Collection<BasePublisher<T>.InternalSubscription> subscriptions = new ConcurrentLinkedQueue();
    private final Lock publishLock = new ReentrantLock();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/tix320/kiwi/internal/reactive/publisher/BasePublisher$InternalSubscription.class */
    public final class InternalSubscription implements Subscriber<T>, Subscription {
        private final long id;
        private final Subscriber<? super T> subscriber;
        private final Lock subscriptionLock = new ReentrantLock();
        private volatile boolean onSubscribeCalled = false;
        private volatile boolean onCompleteCalled = false;
        public volatile boolean completed = false;

        private InternalSubscription(Subscriber<? super T> subscriber) {
            this.subscriber = subscriber;
            this.id = BasePublisher.this.subscriberIdGenerator.next();
        }

        @Override // com.github.tix320.kiwi.api.reactive.observable.Subscriber
        public void onSubscribe(Subscription subscription) {
            this.subscriptionLock.lock();
            try {
                if (this.onSubscribeCalled) {
                    throw new SubscriptionIllegalStateException("OnSubscribe must be called only once");
                }
                this.onSubscribeCalled = true;
                this.subscriber.onSubscribe(subscription);
            } finally {
                this.subscriptionLock.unlock();
            }
        }

        @Override // com.github.tix320.kiwi.api.reactive.observable.Subscriber
        public boolean onPublish(T t) {
            this.subscriptionLock.lock();
            try {
                if (this.completed) {
                    return false;
                }
                if (!this.onSubscribeCalled) {
                    throw new SubscriptionIllegalStateException("OnPublish must be called only after onSubscribe");
                }
                if (this.onCompleteCalled) {
                    throw new SubscriptionIllegalStateException("OnPublish must not be called after onComplete");
                }
                return this.subscriber.onPublish(t);
            } finally {
                this.subscriptionLock.unlock();
            }
        }

        @Override // com.github.tix320.kiwi.api.reactive.observable.Subscriber
        public boolean onError(Throwable th) {
            this.subscriptionLock.lock();
            try {
                if (this.completed) {
                    return false;
                }
                if (!this.onSubscribeCalled) {
                    throw new SubscriptionIllegalStateException("OnError must be called only after onSubscribe");
                }
                if (this.onCompleteCalled) {
                    throw new SubscriptionIllegalStateException("OnError must not be called after onComplete");
                }
                return this.subscriber.onError(th);
            } finally {
                this.subscriptionLock.unlock();
            }
        }

        @Override // com.github.tix320.kiwi.api.reactive.observable.Subscriber
        public void onComplete(CompletionType completionType) {
            this.subscriptionLock.lock();
            try {
                if (!this.onSubscribeCalled) {
                    throw new SubscriptionIllegalStateException("OnComplete must be called only after onSubscribe");
                }
                if (this.onCompleteCalled) {
                    throw new SubscriptionIllegalStateException("OnComplete must be called only once");
                }
                this.onCompleteCalled = true;
                this.subscriber.onComplete(completionType);
            } finally {
                this.subscriptionLock.unlock();
            }
        }

        @Override // com.github.tix320.kiwi.api.reactive.observable.Subscription
        public boolean isCompleted() {
            return this.completed;
        }

        public void cancel(CompletionType completionType) {
            this.subscriptionLock.lock();
            try {
                this.completed = true;
                if (BasePublisher.this.subscriptions.remove(this)) {
                    onComplete(completionType);
                }
            } finally {
                this.subscriptionLock.unlock();
            }
        }

        @Override // com.github.tix320.kiwi.api.reactive.observable.Subscription
        public void unsubscribe() {
            cancel(CompletionType.UNSUBSCRIPTION);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.id == ((InternalSubscription) obj).id;
        }

        public int hashCode() {
            return Objects.hash(Long.valueOf(this.id));
        }
    }

    /* loaded from: input_file:com/github/tix320/kiwi/internal/reactive/publisher/BasePublisher$PublisherObservable.class */
    public final class PublisherObservable implements Observable<T> {
        public PublisherObservable() {
        }

        @Override // com.github.tix320.kiwi.api.reactive.observable.Observable
        public void subscribe(Subscriber<? super T> subscriber) {
            BasePublisher.this.publishLock.lock();
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            try {
                InternalSubscription internalSubscription = new InternalSubscription(subscriber);
                BasePublisher.this.subscriptions.add(internalSubscription);
                internalSubscription.onSubscribe(internalSubscription);
                BasePublisher.this.onNewSubscriber(obj -> {
                    boolean onPublish = internalSubscription.onPublish(obj);
                    atomicBoolean.set(onPublish);
                    return onPublish;
                });
                BasePublisher.this.publishLock.unlock();
                if (BasePublisher.this.completed.get()) {
                    internalSubscription.cancel(CompletionType.SOURCE_COMPLETED);
                } else {
                    if (atomicBoolean.get()) {
                        return;
                    }
                    internalSubscription.cancel(CompletionType.UNSUBSCRIPTION);
                }
            } catch (Throwable th) {
                BasePublisher.this.publishLock.unlock();
                throw th;
            }
        }
    }

    @Override // com.github.tix320.kiwi.api.reactive.publisher.Publisher
    public final void publish(T t) {
        publishObject(t, true);
    }

    @Override // com.github.tix320.kiwi.api.reactive.publisher.Publisher
    public final void publishError(Throwable th) {
        publishObject(th, false);
    }

    @Override // com.github.tix320.kiwi.api.reactive.publisher.Publisher
    public final void complete() {
        this.publishLock.lock();
        try {
            failIfCompleted();
            Iterator<BasePublisher<T>.InternalSubscription> it = this.subscriptions.iterator();
            while (it.hasNext()) {
                it.next().cancel(CompletionType.SOURCE_COMPLETED);
            }
            this.completed.set(true);
        } finally {
            this.publishLock.unlock();
        }
    }

    @Override // com.github.tix320.kiwi.api.reactive.publisher.Publisher
    public final boolean isCompleted() {
        return this.completed.get();
    }

    @Override // com.github.tix320.kiwi.api.reactive.ObservableCandidate
    public Observable<T> asObservable() {
        return new PublisherObservable();
    }

    protected abstract void onNewSubscriber(ConditionalConsumer<T> conditionalConsumer);

    protected void prePublish(T t) {
    }

    protected void postPublish() {
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void publishObject(Object obj, boolean z) {
        this.publishLock.lock();
        try {
            failIfCompleted();
            Collection<BasePublisher<T>.InternalSubscription> subscriptionsCopy = getSubscriptionsCopy();
            if (z) {
                prePublish(obj);
            }
            this.publishLock.unlock();
            Iterator<BasePublisher<T>.InternalSubscription> it = subscriptionsCopy.iterator();
            while (it.hasNext()) {
                try {
                    publishObjectToOneSubscriber(it.next(), obj, z);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            this.publishLock.lock();
            if (z) {
                try {
                    postPublish();
                } finally {
                }
            }
            this.publishLock.unlock();
        } finally {
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void publishObjectToOneSubscriber(BasePublisher<T>.InternalSubscription internalSubscription, Object obj, boolean z) {
        if (z ? internalSubscription.onPublish(obj) : internalSubscription.onError((Throwable) obj)) {
            return;
        }
        internalSubscription.cancel(CompletionType.UNSUBSCRIPTION);
    }

    private void failIfCompleted() {
        if (this.completed.get()) {
            throw new CompletedException("Publisher is completed, you can not complete again or publish items.");
        }
    }

    private Collection<BasePublisher<T>.InternalSubscription> getSubscriptionsCopy() {
        return (Collection) this.subscriptions.stream().filter(internalSubscription -> {
            return !internalSubscription.completed;
        }).collect(Collectors.toList());
    }
}
