package com.github.pgasync.impl.protocol;

import com.github.pgasync.DatabaseConfig;
import com.github.pgasync.SqlException;
import com.github.pgasync.impl.NettyScheduler;
import com.github.pgasync.impl.message.Authentication;
import com.github.pgasync.impl.message.CommandComplete;
import com.github.pgasync.impl.message.ErrorResponse;
import com.github.pgasync.impl.message.Message;
import com.github.pgasync.impl.message.NotificationResponse;
import com.github.pgasync.impl.message.PasswordMessage;
import com.github.pgasync.impl.message.ReadyForQuery;
import com.github.pgasync.impl.message.SSLHandshake;
import com.github.pgasync.impl.message.StartupMessage;
import com.github.pgasync.impl.message.Terminate;
import com.nurkiewicz.typeof.ThenIs;
import com.nurkiewicz.typeof.TypeOf;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Completable;
import rx.Emitter;
import rx.Observable;
import rx.Scheduler;
import rx.Single;
import rx.SingleSubscriber;
import rx.subscriptions.Subscriptions;

/* loaded from: input_file:com/github/pgasync/impl/protocol/ProtocolStream.class */
public class ProtocolStream {
    private static final Logger LOG = LoggerFactory.getLogger(ProtocolStream.class);
    private final EventLoopGroup group;
    private final DatabaseConfig config;
    private ChannelHandlerContext ctx;
    private boolean dirty;
    private Scheduler scheduler;
    private final Queue<PgConsumer> subscribers = new LinkedBlockingDeque();
    private final ConcurrentMap<String, List<StreamConsumer<String>>> listeners = new ConcurrentHashMap();
    private final GenericFutureListener<Future<? super Object>> onError = future -> {
        if (future.isSuccess()) {
            return;
        }
        handleError(future.cause());
    };

    /* loaded from: input_file:com/github/pgasync/impl/protocol/ProtocolStream$InboundChannelInitializer.class */
    private class InboundChannelInitializer extends ChannelInboundHandlerAdapter {
        private final StartupMessage startup;

        InboundChannelInitializer(StartupMessage startupMessage) {
            this.startup = startupMessage;
        }

        public void channelActive(ChannelHandlerContext channelHandlerContext) {
            ProtocolStream.this.ctx = channelHandlerContext;
            ProtocolStream.this.scheduler = NettyScheduler.forEventExecutor(ProtocolStream.this.ctx.executor());
            if (ProtocolStream.this.config.useSsl()) {
                ProtocolStream.this.write(SSLHandshake.INSTANCE);
            } else {
                writeStartupAndFixPipeline(channelHandlerContext);
            }
        }

        public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) {
            TypeOf.whenTypeOf(obj).is(SslHandshakeCompletionEvent.class).then(sslHandshakeCompletionEvent -> {
                if (sslHandshakeCompletionEvent.isSuccess()) {
                    writeStartupAndFixPipeline(channelHandlerContext);
                } else {
                    channelHandlerContext.fireExceptionCaught(new SqlException("Failed to initialise SSL"));
                }
            });
        }

        private void writeStartupAndFixPipeline(ChannelHandlerContext channelHandlerContext) {
            ProtocolStream.this.write(this.startup);
            channelHandlerContext.pipeline().remove(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/github/pgasync/impl/protocol/ProtocolStream$PgConsumer.class */
    public abstract class PgConsumer implements Consumer<Message> {
        final String query;

        PgConsumer(String str) {
            this.query = str;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract void error(Throwable th);

        void closeStream() {
            ProtocolStream.LOG.warn("Closing channel due to premature cancellation [{}]", this.query);
            ProtocolStream.this.subscribers.remove(this);
            ProtocolStream.this.dirty = true;
            ProtocolStream.this.ctx.channel().close();
        }
    }

    /* loaded from: input_file:com/github/pgasync/impl/protocol/ProtocolStream$ProtocolConsumer.class */
    abstract class ProtocolConsumer<T> extends PgConsumer {
        final SingleSubscriber<T> subscriber;
        final AtomicBoolean done;

        /* JADX WARN: Multi-variable type inference failed */
        ProtocolConsumer(SingleSubscriber<?> singleSubscriber, String str) {
            super(str);
            this.done = new AtomicBoolean();
            this.subscriber = singleSubscriber;
            singleSubscriber.add(Subscriptions.create(this::unsubscribe));
        }

        void complete(T t) {
            if (this.done.get()) {
                return;
            }
            this.done.set(true);
            this.subscriber.onSuccess(t);
        }

        void complete() {
            complete(null);
        }

        @Override // com.github.pgasync.impl.protocol.ProtocolStream.PgConsumer
        void error(Throwable th) {
            this.done.set(true);
            this.subscriber.onError(th);
        }

        void unsubscribe() {
            if (this.done.get()) {
                return;
            }
            closeStream();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/github/pgasync/impl/protocol/ProtocolStream$StreamConsumer.class */
    public abstract class StreamConsumer<T> extends PgConsumer {
        final Emitter<T> subscriber;
        final AtomicBoolean done;

        StreamConsumer(Emitter<T> emitter, String str) {
            super(str);
            this.done = new AtomicBoolean();
            this.subscriber = emitter;
            emitter.setSubscription(Subscriptions.create(this::unsubscribe));
        }

        void complete() {
            if (this.done.get()) {
                return;
            }
            this.done.set(true);
            this.subscriber.onCompleted();
        }

        @Override // com.github.pgasync.impl.protocol.ProtocolStream.PgConsumer
        public void error(Throwable th) {
            this.done.set(true);
            this.subscriber.onError(th);
        }

        void unsubscribe() {
            if (this.done.get()) {
                return;
            }
            closeStream();
        }
    }

    public ProtocolStream(EventLoopGroup eventLoopGroup, DatabaseConfig databaseConfig) {
        this.group = eventLoopGroup;
        this.config = databaseConfig;
    }

    public Single<Authentication> connect(StartupMessage startupMessage) {
        return Single.create(singleSubscriber -> {
            this.subscribers.add(new ProtocolConsumer<Authentication>(singleSubscriber, "CONNECT") { // from class: com.github.pgasync.impl.protocol.ProtocolStream.1
                @Override // java.util.function.Consumer
                public void accept(Message message) {
                    TypeOf.whenTypeOf(message).is(ErrorResponse.class).then(errorResponse -> {
                        error(ProtocolStream.this.toSqlException(errorResponse));
                    }).is(ReadyForQuery.class).then(readyForQuery -> {
                        complete(new Authentication(true, null));
                    }).is(Authentication.class).then(this::handleAuthRequest).orElse(message2 -> {
                        error(new SqlException("Unexpected message at startup stage: " + message2));
                    });
                }

                private void handleAuthRequest(Authentication authentication) {
                    if (authentication.success()) {
                        return;
                    }
                    ProtocolStream.this.subscribers.remove();
                    complete(authentication);
                }
            });
            new Bootstrap().group(this.group).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(this.config.connectTimeout())).channel(NioSocketChannel.class).handler(new ProtocolInitializer(this.config, new InboundChannelInitializer(startupMessage), new ProtocolHandler(this.subscribers, this.listeners, this::handleError))).connect(this.config.address()).addListener(this.onError);
        });
    }

    public Completable authenticate(PasswordMessage passwordMessage) {
        return Single.create(singleSubscriber -> {
            this.subscribers.add(new ProtocolConsumer<Void>(singleSubscriber, "AUTHENTICATE") { // from class: com.github.pgasync.impl.protocol.ProtocolStream.2
                @Override // java.util.function.Consumer
                public void accept(Message message) {
                    TypeOf.whenTypeOf(message).is(ErrorResponse.class).then(errorResponse -> {
                        error(ProtocolStream.this.toSqlException(errorResponse));
                    }).is(Authentication.class).then(this::handleAuthResponse).is(ReadyForQuery.class).then(readyForQuery -> {
                        complete();
                    });
                }

                private void handleAuthResponse(Authentication authentication) {
                    if (authentication.success()) {
                        return;
                    }
                    error(new SqlException("Failed to authenticate"));
                }
            });
            write(passwordMessage);
        }).subscribeOn(this.scheduler).toCompletable();
    }

    public Observable<Message> command(Message... messageArr) {
        return messageArr.length == 0 ? Observable.error(new IllegalArgumentException("No messages to send")) : !isConnected() ? Observable.error(new IllegalStateException("Channel is closed [" + messageArr[0] + "]")) : Observable.unsafeCreate(BackPressuredEmitter.create(emitter -> {
            StreamConsumer<Message> streamConsumer = new StreamConsumer<Message>(emitter, messageArr[0].toString()) { // from class: com.github.pgasync.impl.protocol.ProtocolStream.3
                SqlException exception;

                @Override // java.util.function.Consumer
                public void accept(Message message) {
                    ThenIs is = TypeOf.whenTypeOf(message).is(ErrorResponse.class).then(this::handleError).is(ReadyForQuery.class).then(readyForQuery -> {
                        handleReady();
                    }).is(CommandComplete.class).then(this::handleCompletion).is(Message.class);
                    Emitter emitter = emitter;
                    emitter.getClass();
                    is.then((v1) -> {
                        r1.onNext(v1);
                    });
                }

                private void handleCompletion(CommandComplete commandComplete) {
                    ProtocolStream.this.enableAutoRead();
                    emitter.onNext(commandComplete);
                }

                private void handleReady() {
                    if (this.exception == null) {
                        complete();
                    } else {
                        error(this.exception);
                    }
                }

                private void handleError(ErrorResponse errorResponse) {
                    this.exception = ProtocolStream.this.toSqlException(errorResponse);
                    ProtocolStream.this.enableAutoRead();
                }
            };
            ensureInLoop(() -> {
                this.subscribers.add(streamConsumer);
                write(messageArr);
                disableAutoRead();
                readNext();
            });
        }, this::readNext));
    }

    public Observable<String> listen(String str) {
        return !isConnected() ? Observable.error(new IllegalStateException("Channel is closed [LISTEN]")) : Observable.unsafeCreate(BackPressuredEmitter.create(emitter -> {
            StreamConsumer<String> streamConsumer = new StreamConsumer<String>(emitter, "LISTEN") { // from class: com.github.pgasync.impl.protocol.ProtocolStream.4
                @Override // java.util.function.Consumer
                public void accept(Message message) {
                    ThenIs is = TypeOf.whenTypeOf(message).is(ErrorResponse.class).then(this::handleError).is(CommandComplete.class).then(commandComplete -> {
                        ProtocolStream.this.enableAutoRead();
                    }).is(NotificationResponse.class);
                    Emitter emitter = emitter;
                    is.then(notificationResponse -> {
                        emitter.onNext(notificationResponse.payload());
                    });
                }

                private void handleError(ErrorResponse errorResponse) {
                    emitter.onError(ProtocolStream.this.toSqlException(errorResponse));
                    ProtocolStream.this.enableAutoRead();
                }

                @Override // com.github.pgasync.impl.protocol.ProtocolStream.StreamConsumer
                protected void unsubscribe() {
                    ProtocolStream.this.enableAutoRead();
                    EventExecutor executor = ProtocolStream.this.ctx.executor();
                    String str2 = str;
                    executor.submit(() -> {
                        Optional.of(ProtocolStream.this.listeners.get(str2)).ifPresent(list -> {
                            list.remove(this);
                            if (list.isEmpty()) {
                                ProtocolStream.this.listeners.remove(str2);
                            }
                        });
                    });
                }
            };
            ensureInLoop(() -> {
                List<StreamConsumer<String>> orDefault = this.listeners.getOrDefault(str, new LinkedList());
                orDefault.add(streamConsumer);
                this.listeners.put(str, orDefault);
                disableAutoRead();
                readNext();
            });
        }, this::readNext));
    }

    public boolean isConnected() {
        return !this.dirty && ((Boolean) Optional.ofNullable(this.ctx).map(channelHandlerContext -> {
            return Boolean.valueOf(channelHandlerContext.channel().isOpen());
        }).orElse(false)).booleanValue();
    }

    public Completable close() {
        return Completable.create(completableSubscriber -> {
            this.dirty = true;
            handleError(new RuntimeException("Closing connection"));
            this.ctx.writeAndFlush(Terminate.INSTANCE).addListener(future -> {
                if (future.isSuccess()) {
                    completableSubscriber.onCompleted();
                } else {
                    completableSubscriber.onError(future.cause());
                }
            });
        }).subscribeOn(this.scheduler);
    }

    private void ensureInLoop(Runnable runnable) {
        if (this.ctx.executor().inEventLoop()) {
            runnable.run();
        } else {
            this.ctx.executor().submit(runnable);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void write(Message... messageArr) {
        for (Message message : messageArr) {
            LOG.trace("Writing: {}", message);
            this.ctx.write(message).addListener(this.onError);
        }
        this.ctx.flush();
    }

    private void readNext() {
        this.ctx.channel().read();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void enableAutoRead() {
        this.ctx.channel().config().setAutoRead(true);
    }

    private void disableAutoRead() {
        this.ctx.channel().config().setAutoRead(false);
    }

    private void handleError(Throwable th) {
        if (isConnected()) {
            Optional.ofNullable(this.subscribers.poll()).ifPresent(pgConsumer -> {
                pgConsumer.error(th);
            });
        } else {
            this.subscribers.forEach(pgConsumer2 -> {
                pgConsumer2.error(th);
            });
            this.subscribers.clear();
            this.listeners.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).forEach(streamConsumer -> {
                streamConsumer.error(th);
            });
            this.listeners.clear();
        }
        this.dirty = true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public SqlException toSqlException(ErrorResponse errorResponse) {
        return new SqlException(errorResponse.level().name(), errorResponse.code(), errorResponse.message());
    }
}
