package com.github.pgasync.impl.protocol;

import com.github.pgasync.impl.message.Message;
import com.github.pgasync.impl.message.NotificationResponse;
import com.github.pgasync.impl.message.ReadyForQuery;
import com.github.pgasync.impl.protocol.ProtocolStream;
import com.nurkiewicz.typeof.TypeOf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/pgasync/impl/protocol/ProtocolHandler.class */
class ProtocolHandler extends ChannelInboundHandlerAdapter {
    private final Logger LOG = LoggerFactory.getLogger(ProtocolHandler.class);
    private final Queue<ProtocolStream.PgConsumer> subscribers;
    private final Map<String, List<ProtocolStream.StreamConsumer<String>>> listeners;
    private final Consumer<Throwable> errorHandler;

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        this.LOG.trace("Reading: {}", obj);
        TypeOf.whenTypeOf(obj).is(NotificationResponse.class).then(this::publishNotification).is(ReadyForQuery.class).then(readyForQuery -> {
            this.subscribers.poll().accept(readyForQuery);
        }).is(Message.class).then(message -> {
            this.subscribers.peek().accept(message);
        });
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.subscribers.isEmpty()) {
            return;
        }
        exceptionCaught(channelHandlerContext, new IOException("Channel state changed to inactive"));
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        this.errorHandler.accept(th);
    }

    private void publishNotification(NotificationResponse notificationResponse) {
        Optional.of(this.listeners.get(notificationResponse.channel())).ifPresent(list -> {
            list.forEach(streamConsumer -> {
                streamConsumer.accept(notificationResponse);
            });
        });
    }

    @ConstructorProperties({"subscribers", "listeners", "errorHandler"})
    public ProtocolHandler(Queue<ProtocolStream.PgConsumer> queue, Map<String, List<ProtocolStream.StreamConsumer<String>>> map, Consumer<Throwable> consumer) {
        this.subscribers = queue;
        this.listeners = map;
        this.errorHandler = consumer;
    }
}
