package hu.akarnokd.reactive4java.util;

import hu.akarnokd.reactive4java.base.ConnectableObservable;
import hu.akarnokd.reactive4java.base.Observable;
import hu.akarnokd.reactive4java.base.Observer;
import hu.akarnokd.reactive4java.base.Subject;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/* loaded from: input_file:hu/akarnokd/reactive4java/util/DefaultConnectableObservable.class */
public class DefaultConnectableObservable<T, U> implements ConnectableObservable<U> {

    @Nonnull
    protected final Subject<? super T, ? extends U> subject;

    @Nonnull
    protected final Observable<? extends T> source;

    @Nonnull
    protected final Lock lock;

    @GuardedBy("lock")
    @Nullable
    protected Closeable connection;

    /* loaded from: input_file:hu/akarnokd/reactive4java/util/DefaultConnectableObservable$InnerConnection.class */
    protected class InnerConnection implements Closeable {

        @Nonnull
        protected Closeable c;

        public InnerConnection(@Nonnull Closeable closeable) {
            this.c = closeable;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            Closeable closeable = null;
            DefaultConnectableObservable.this.lock.lock();
            try {
                if (this.c != null) {
                    closeable = this.c;
                    this.c = null;
                    DefaultConnectableObservable.this.connection = null;
                }
                if (closeable != null) {
                    closeable.close();
                }
            } finally {
                DefaultConnectableObservable.this.lock.unlock();
            }
        }
    }

    public DefaultConnectableObservable(@Nonnull Observable<? extends T> observable, @Nonnull Subject<? super T, ? extends U> subject) {
        this(observable, subject, new ReentrantLock(R4JConfigManager.get().useFairLocks()));
    }

    public DefaultConnectableObservable(@Nonnull Observable<? extends T> observable, @Nonnull Subject<? super T, ? extends U> subject, @Nonnull Lock lock) {
        this.subject = subject;
        this.source = observable;
        this.lock = lock;
    }

    @Override // hu.akarnokd.reactive4java.base.ConnectableObservable
    public Closeable connect() {
        this.lock.lock();
        try {
            if (this.connection == null) {
                this.connection = new InnerConnection(this.source.register(this.subject));
            }
            return this.connection;
        } finally {
            this.lock.unlock();
        }
    }

    @Override // hu.akarnokd.reactive4java.base.Observable
    @Nonnull
    public Closeable register(@Nonnull Observer<? super U> observer) {
        return this.subject.register(observer);
    }
}
