package scalikejdbc.streams;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import scala.Option;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.util.control.NonFatal$;
import scalikejdbc.Log;
import scalikejdbc.LogSupport;

/* compiled from: DatabasePublisher.scala */
@ScalaSignature(bytes = "\u0006\u0003m3A!\u0001\u0002\u0001\u000f\t\tB)\u0019;bE\u0006\u001cX\rU;cY&\u001c\b.\u001a:\u000b\u0005\r!\u0011aB:ue\u0016\fWn\u001d\u0006\u0002\u000b\u0005Y1oY1mS.,'\u000e\u001a2d\u0007\u0001)\"\u0001C\u000e\u0014\t\u0001I\u0011c\n\t\u0003\u0015=i\u0011a\u0003\u0006\u0003\u00195\tA\u0001\\1oO*\ta\"\u0001\u0003kCZ\f\u0017B\u0001\t\f\u0005\u0019y%M[3diB\u0019!cF\r\u000e\u0003MQ!\u0001F\u000b\u0002\u001fI,\u0017m\u0019;jm\u0016\u001cHO]3b[NT\u0011AF\u0001\u0004_J<\u0017B\u0001\r\u0014\u0005%\u0001VO\u00197jg\",'\u000f\u0005\u0002\u001b71\u0001A!\u0002\u000f\u0001\u0005\u0004i\"!A!\u0012\u0005y!\u0003CA\u0010#\u001b\u0005\u0001#\"A\u0011\u0002\u000bM\u001c\u0017\r\\1\n\u0005\r\u0002#a\u0002(pi\"Lgn\u001a\t\u0003?\u0015J!A\n\u0011\u0003\u0007\u0005s\u0017\u0010\u0005\u0002)S5\tA!\u0003\u0002+\t\tQAj\\4TkB\u0004xN\u001d;\t\u00131\u0002!Q1A\u0005\u0002\ti\u0013\u0001C:fiRLgnZ:\u0016\u00039\u00022a\f\u0019\u001a\u001b\u0005\u0011\u0011BA\u0019\u0003\u0005e!\u0015\r^1cCN,\u0007+\u001e2mSNDWM]*fiRLgnZ:\t\u0011M\u0002!\u0011!Q\u0001\n9\n\u0011b]3ui&twm\u001d\u0011\t\u0013U\u0002!Q1A\u0005\u0002\t1\u0014aA:rYV\tq\u0007E\u00020qeI!!\u000f\u0002\u0003\u001dM#(/Z1n%\u0016\fG-_*R\u0019\"A1\b\u0001B\u0001B\u0003%q'\u0001\u0003tc2\u0004\u0003\"C\u001f\u0001\u0005\u000b\u0007I\u0011\u0001\u0002?\u00035\t7/\u001f8d\u000bb,7-\u001e;peV\tq\b\u0005\u00020\u0001&\u0011\u0011I\u0001\u0002\u000e\u0003NLhnY#yK\u000e,Ho\u001c:\t\u0011\r\u0003!\u0011!Q\u0001\n}\na\"Y:z]\u000e,\u00050Z2vi>\u0014\b\u0005\u0003\u0004F\u0001\u0011\u0005!AR\u0001\u0007y%t\u0017\u000e\u001e \u0015\t\u001dC\u0015J\u0013\t\u0004_\u0001I\u0002\"\u0002\u0017E\u0001\u0004q\u0003\"B\u001bE\u0001\u00049\u0004\"B\u001fE\u0001\u0004y\u0004\"\u0002'\u0001\t\u0003j\u0015!C:vEN\u001c'/\u001b2f)\tq\u0015\u000b\u0005\u0002 \u001f&\u0011\u0001\u000b\t\u0002\u0005+:LG\u000fC\u0003S\u0017\u0002\u00071+\u0001\u0006tk\n\u001c8M]5cKJ\u0004$\u0001\u0016-\u0011\u0007I)v+\u0003\u0002W'\tQ1+\u001e2tGJL'-\u001a:\u0011\u0005iAF!C-R\u0003\u0003\u0005\tQ!\u0001[\u0005\ryF%M\t\u00033\u0011\u0002")
/* loaded from: input_file:scalikejdbc/streams/DatabasePublisher.class */
public class DatabasePublisher<A> implements Publisher<A>, LogSupport {
    private final DatabasePublisherSettings<A> settings;
    private final StreamReadySQL<A> sql;
    private final AsyncExecutor asyncExecutor;
    private final Log log;

    public Log log() {
        return this.log;
    }

    public void scalikejdbc$LogSupport$_setter_$log_$eq(Log log) {
        this.log = log;
    }

    public DatabasePublisherSettings<A> settings() {
        return this.settings;
    }

    public StreamReadySQL<A> sql() {
        return this.sql;
    }

    public AsyncExecutor asyncExecutor() {
        return this.asyncExecutor;
    }

    public void subscribe(Subscriber<? super A> subscriber) {
        BoxedUnit boxedUnit;
        if (subscriber == null) {
            throw new NullPointerException("given Subscriber to DatabasePublisher#subscribe is null. (Reactive Streams spec, 1.9)");
        }
        try {
            DatabaseSubscription databaseSubscription = new DatabaseSubscription(this, subscriber);
            try {
                try {
                    subscriber.onSubscribe(databaseSubscription);
                    databaseSubscription.startNewStreaming();
                    databaseSubscription.prepareCompletionHandler();
                    log().info(() -> {
                        return new StringBuilder(50).append("Database stream requested by subscriber: ").append(subscriber).append(" is ready").toString();
                    });
                } catch (Throwable th) {
                    Option unapply = NonFatal$.MODULE$.unapply(th);
                    if (unapply.isEmpty()) {
                        throw th;
                    }
                    Throwable th2 = (Throwable) unapply.get();
                    log().warn(() -> {
                        return new StringBuilder(43).append("Failed to make preparation for subscriber: ").append(subscriber).toString();
                    }, th2);
                    databaseSubscription.onError(th2);
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }
            } catch (Throwable th3) {
                Option unapply2 = NonFatal$.MODULE$.unapply(th3);
                if (unapply2.isEmpty()) {
                    throw th3;
                }
                Throwable th4 = (Throwable) unapply2.get();
                log().warn(() -> {
                    return new StringBuilder(68).append("Subscriber#onSubscribe for subscriber: ").append(subscriber).append(" unexpectedly failed because ").append(th4.getMessage()).toString();
                }, th4);
                databaseSubscription.onError(th4);
            }
        } catch (Throwable th5) {
            Option unapply3 = NonFatal$.MODULE$.unapply(th5);
            if (unapply3.isEmpty()) {
                throw th5;
            }
            Throwable th6 = (Throwable) unapply3.get();
            if (log().isDebugEnabled()) {
                log().debug(() -> {
                    return new StringBuilder(68).append("Ignore exceptions for subscriber: ").append(subscriber).append(" to obey Reactive Streams spec 1-9").toString();
                }, th6);
                boxedUnit = BoxedUnit.UNIT;
            } else {
                boxedUnit = BoxedUnit.UNIT;
            }
        }
    }

    public DatabasePublisher(DatabasePublisherSettings<A> databasePublisherSettings, StreamReadySQL<A> streamReadySQL, AsyncExecutor asyncExecutor) {
        this.settings = databasePublisherSettings;
        this.sql = streamReadySQL;
        this.asyncExecutor = asyncExecutor;
        LogSupport.$init$(this);
    }
}
