package scalikejdbc.streams;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import scala.Option;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.Statics;
import scala.util.control.NonFatal$;
import scalikejdbc.Log;
import scalikejdbc.LogSupport;

/* compiled from: DatabasePublisher.scala */
@ScalaSignature(bytes = "\u0006\u0005m3A!\u0003\u0006\u0001\u001f!I1\u0007\u0001BC\u0002\u0013\u0005!\u0002\u000e\u0005\ts\u0001\u0011\t\u0011)A\u0005k!I!\b\u0001BC\u0002\u0013\u0005!b\u000f\u0005\t\u007f\u0001\u0011\t\u0011)A\u0005y!I\u0001\t\u0001BC\u0002\u0013\u0005!\"\u0011\u0005\t\u000b\u0002\u0011\t\u0011)A\u0005\u0005\"1a\t\u0001C\u0001\u0015\u001dCQ\u0001\u0014\u0001\u0005B5\u0013\u0011\u0003R1uC\n\f7/\u001a)vE2L7\u000f[3s\u0015\tYA\"A\u0004tiJ,\u0017-\\:\u000b\u00035\t1b]2bY&\\WM\u001b3cG\u000e\u0001QC\u0001\t$'\u0011\u0001\u0011#G\u0018\u0011\u0005I9R\"A\n\u000b\u0005Q)\u0012\u0001\u00027b]\u001eT\u0011AF\u0001\u0005U\u00064\u0018-\u0003\u0002\u0019'\t1qJ\u00196fGR\u00042AG\u0010\"\u001b\u0005Y\"B\u0001\u000f\u001e\u0003=\u0011X-Y2uSZ,7\u000f\u001e:fC6\u001c(\"\u0001\u0010\u0002\u0007=\u0014x-\u0003\u0002!7\tI\u0001+\u001e2mSNDWM\u001d\t\u0003E\rb\u0001\u0001B\u0003%\u0001\t\u0007QEA\u0001B#\t1C\u0006\u0005\u0002(U5\t\u0001FC\u0001*\u0003\u0015\u00198-\u00197b\u0013\tY\u0003FA\u0004O_RD\u0017N\\4\u0011\u0005\u001dj\u0013B\u0001\u0018)\u0005\r\te.\u001f\t\u0003aEj\u0011\u0001D\u0005\u0003e1\u0011!\u0002T8h'V\u0004\bo\u001c:u\u0003!\u0019X\r\u001e;j]\u001e\u001cX#A\u001b\u0011\u0007Y:\u0014%D\u0001\u000b\u0013\tA$BA\rECR\f'-Y:f!V\u0014G.[:iKJ\u001cV\r\u001e;j]\u001e\u001c\u0018!C:fiRLgnZ:!\u0003\r\u0019\u0018\u000f\\\u000b\u0002yA\u0019a'P\u0011\n\u0005yR!AD*ue\u0016\fWNU3bIf\u001c\u0016\u000bT\u0001\u0005gFd\u0007%A\u0007bgft7-\u0012=fGV$xN]\u000b\u0002\u0005B\u0011agQ\u0005\u0003\t*\u0011Q\"Q:z]\u000e,\u00050Z2vi>\u0014\u0018AD1ts:\u001cW\t_3dkR|'\u000fI\u0001\u0007y%t\u0017\u000e\u001e \u0015\t!K%j\u0013\t\u0004m\u0001\t\u0003\"B\u001a\b\u0001\u0004)\u0004\"\u0002\u001e\b\u0001\u0004a\u0004\"\u0002!\b\u0001\u0004\u0011\u0015!C:vEN\u001c'/\u001b2f)\tq\u0015\u000b\u0005\u0002(\u001f&\u0011\u0001\u000b\u000b\u0002\u0005+:LG\u000fC\u0003S\u0011\u0001\u00071+\u0001\u0006tk\n\u001c8M]5cKJ\u0004$\u0001\u0016-\u0011\u0007i)v+\u0003\u0002W7\tQ1+\u001e2tGJL'-\u001a:\u0011\u0005\tBF!C-R\u0003\u0003\u0005\tQ!\u0001[\u0005\ryF%M\t\u0003C1\u0002")
/* loaded from: input_file:scalikejdbc/streams/DatabasePublisher.class */
public class DatabasePublisher<A> implements Publisher<A>, LogSupport {
    private final DatabasePublisherSettings<A> settings;
    private final StreamReadySQL<A> sql;
    private final AsyncExecutor asyncExecutor;
    private Log log;

    public Log log() {
        return this.log;
    }

    public void scalikejdbc$LogSupport$_setter_$log_$eq(Log log) {
        this.log = log;
    }

    public DatabasePublisherSettings<A> settings() {
        return this.settings;
    }

    public StreamReadySQL<A> sql() {
        return this.sql;
    }

    public AsyncExecutor asyncExecutor() {
        return this.asyncExecutor;
    }

    public void subscribe(Subscriber<? super A> subscriber) {
        BoxedUnit boxedUnit;
        if (subscriber == null) {
            throw new NullPointerException("given Subscriber to DatabasePublisher#subscribe is null. (Reactive Streams spec, 1.9)");
        }
        try {
            DatabaseSubscription databaseSubscription = new DatabaseSubscription(this, subscriber);
            try {
                try {
                    subscriber.onSubscribe(databaseSubscription);
                    databaseSubscription.startNewStreaming();
                    databaseSubscription.prepareCompletionHandler();
                    log().info(() -> {
                        return new StringBuilder(50).append("Database stream requested by subscriber: ").append(subscriber).append(" is ready").toString();
                    });
                } catch (Throwable th) {
                    if (th != null) {
                        Option unapply = NonFatal$.MODULE$.unapply(th);
                        if (!unapply.isEmpty()) {
                            Throwable th2 = (Throwable) unapply.get();
                            log().warn(() -> {
                                return new StringBuilder(68).append("Subscriber#onSubscribe for subscriber: ").append(subscriber).append(" unexpectedly failed because ").append(th2.getMessage()).toString();
                            }, th2);
                            databaseSubscription.onError(th2);
                            return;
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (th3 != null) {
                    Option unapply2 = NonFatal$.MODULE$.unapply(th3);
                    if (!unapply2.isEmpty()) {
                        Throwable th4 = (Throwable) unapply2.get();
                        log().warn(() -> {
                            return new StringBuilder(43).append("Failed to make preparation for subscriber: ").append(subscriber).toString();
                        }, th4);
                        databaseSubscription.onError(th4);
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            if (th5 != null) {
                Option unapply3 = NonFatal$.MODULE$.unapply(th5);
                if (!unapply3.isEmpty()) {
                    Throwable th6 = (Throwable) unapply3.get();
                    if (log().isDebugEnabled()) {
                        log().debug(() -> {
                            return new StringBuilder(68).append("Ignore exceptions for subscriber: ").append(subscriber).append(" to obey Reactive Streams spec 1-9").toString();
                        }, th6);
                        boxedUnit = BoxedUnit.UNIT;
                    } else {
                        boxedUnit = BoxedUnit.UNIT;
                    }
                    return;
                }
            }
            throw th5;
        }
    }

    public DatabasePublisher(DatabasePublisherSettings<A> databasePublisherSettings, StreamReadySQL<A> streamReadySQL, AsyncExecutor asyncExecutor) {
        this.settings = databasePublisherSettings;
        this.sql = streamReadySQL;
        this.asyncExecutor = asyncExecutor;
        LogSupport.$init$(this);
        Statics.releaseFence();
    }
}
