package io.datakernel.stream.net;

import com.google.common.base.Preconditions;
import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.eventloop.NioEventloop;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.AbstractStreamProducer;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamConsumerSwitcher;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamForwarder;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamProducerSwitcher;
import io.datakernel.stream.processor.StreamDeserializer;
import io.datakernel.stream.processor.StreamSerializer;
import java.nio.channels.SocketChannel;
import java.util.HashMap;

/* loaded from: input_file:io/datakernel/stream/net/StreamMessagingConnection.class */
public class StreamMessagingConnection<I, O> extends TcpStreamSocketConnection implements Messaging<O>, StreamDataReceiver<I> {
    private MessagingStarter<O> starter;
    protected final HashMap<Class<? extends I>, MessagingHandler<? extends I, O>> handlers;
    private StreamConsumerSwitcher<ByteBuf> socketReaderSwitcher;
    private StreamProducerSwitcher<ByteBuf> socketWriterSwitcher;
    private final StreamDeserializer<I> streamDeserializer;
    private final StreamSerializer<O> streamSerializer;
    private final StreamMessagingConnection<I, O>.MessageConsumer messageConsumer;
    private final StreamMessagingConnection<I, O>.MessageProducer messageProducer;
    private StreamDataReceiver<O> output;

    /* loaded from: input_file:io/datakernel/stream/net/StreamMessagingConnection$MessageConsumer.class */
    private class MessageConsumer extends AbstractStreamConsumer<I> {
        public MessageConsumer() {
            super(StreamMessagingConnection.this.eventloop);
        }

        @Override // io.datakernel.stream.StreamConsumer
        public StreamDataReceiver<I> getDataReceiver() {
            return StreamMessagingConnection.this;
        }

        @Override // io.datakernel.stream.StreamConsumer
        public void onEndOfStream() {
            if (StreamMessagingConnection.this.socketReaderSwitcher.getCurrentConsumer() == StreamMessagingConnection.this.streamDeserializer) {
                StreamMessagingConnection.this.shutdown();
            }
        }

        @Override // io.datakernel.stream.AbstractStreamConsumer, io.datakernel.stream.StreamConsumer
        public void onError(Exception exc) {
        }
    }

    /* loaded from: input_file:io/datakernel/stream/net/StreamMessagingConnection$MessageProducer.class */
    private class MessageProducer extends AbstractStreamProducer<O> {
        public MessageProducer() {
            super(StreamMessagingConnection.this.eventloop);
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onSuspended() {
        }

        @Override // io.datakernel.stream.AbstractStreamProducer
        protected void onResumed() {
        }
    }

    public StreamMessagingConnection(NioEventloop nioEventloop, SocketChannel socketChannel, StreamDeserializer<I> streamDeserializer, StreamSerializer<O> streamSerializer) {
        super(nioEventloop, socketChannel);
        this.handlers = new HashMap<>();
        this.messageConsumer = new MessageConsumer();
        this.messageProducer = new MessageProducer();
        this.streamDeserializer = streamDeserializer;
        this.streamSerializer = streamSerializer;
        this.socketReaderSwitcher = new StreamConsumerSwitcher<>(nioEventloop, this.streamDeserializer);
        this.socketWriterSwitcher = new StreamProducerSwitcher<>(nioEventloop, this.streamSerializer);
        this.streamDeserializer.streamTo(this.messageConsumer);
        this.messageProducer.streamTo(this.streamSerializer);
    }

    public <T extends I> StreamMessagingConnection<I, O> addHandler(Class<T> cls, MessagingHandler<T, O> messagingHandler) {
        this.handlers.put(cls, messagingHandler);
        return this;
    }

    public StreamMessagingConnection<I, O> addStarter(MessagingStarter<O> messagingStarter) {
        this.starter = messagingStarter;
        return this;
    }

    @Override // io.datakernel.stream.net.TcpStreamSocketConnection
    protected void wire(StreamProducer<ByteBuf> streamProducer, StreamConsumer<ByteBuf> streamConsumer) {
        streamProducer.streamTo(this.socketReaderSwitcher);
        this.socketWriterSwitcher.streamTo(streamConsumer);
        this.output = this.messageProducer.getDownstreamDataReceiver();
        onStart();
    }

    protected void onStart() {
        if (this.starter != null) {
            this.starter.onStart(this);
        }
    }

    @Override // io.datakernel.stream.StreamDataReceiver
    public void onData(I i) {
        Class<?> cls = i.getClass();
        MessagingHandler<? extends I, O> messagingHandler = this.handlers.get(cls);
        Preconditions.checkNotNull(messagingHandler, "No handler for message type: %s", new Object[]{cls});
        messagingHandler.onMessage(i, this);
    }

    @Override // io.datakernel.stream.net.Messaging
    public void sendMessage(O o) {
        this.output.onData(o);
    }

    @Override // io.datakernel.stream.net.Messaging
    public StreamConsumer<ByteBuf> binarySocketWriter() {
        StreamForwarder streamForwarder = new StreamForwarder(this.eventloop);
        this.streamSerializer.flush();
        this.socketWriterSwitcher.switchProducerTo(streamForwarder);
        return streamForwarder;
    }

    @Override // io.datakernel.stream.net.Messaging
    public StreamProducer<ByteBuf> binarySocketReader() {
        StreamForwarder streamForwarder = new StreamForwarder(this.eventloop);
        this.socketReaderSwitcher.switchConsumerTo(streamForwarder);
        this.streamDeserializer.drainBuffersTo(streamForwarder);
        return streamForwarder;
    }

    @Override // io.datakernel.stream.net.Messaging
    public void shutdown() {
        this.eventloop.post(new Runnable() { // from class: io.datakernel.stream.net.StreamMessagingConnection.1
            @Override // java.lang.Runnable
            public void run() {
                StreamMessagingConnection.this.messageConsumer.closeUpstream();
                StreamMessagingConnection.this.messageProducer.sendEndOfStream();
            }
        });
    }

    @Override // io.datakernel.stream.net.Messaging
    public void shutdownReader() {
        Preconditions.checkState(this.socketReaderSwitcher.getCurrentConsumer() == this.streamDeserializer, "SocketReader is rewired to another stream");
        this.eventloop.post(new Runnable() { // from class: io.datakernel.stream.net.StreamMessagingConnection.2
            @Override // java.lang.Runnable
            public void run() {
                StreamMessagingConnection.this.messageConsumer.closeUpstream();
            }
        });
    }

    @Override // io.datakernel.stream.net.Messaging
    public void shutdownWriter() {
        Preconditions.checkState(this.socketWriterSwitcher.getCurrentProducer() == this.streamSerializer, "SocketWriter is rewired to another stream");
        this.eventloop.post(new Runnable() { // from class: io.datakernel.stream.net.StreamMessagingConnection.3
            @Override // java.lang.Runnable
            public void run() {
                StreamMessagingConnection.this.messageProducer.sendEndOfStream();
            }
        });
    }
}
