package io.datakernel.stream.net;

import io.datakernel.annotation.Nullable;
import io.datakernel.async.Callback;
import io.datakernel.async.SettableStage;
import io.datakernel.async.Stage;
import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.bytebuf.ByteBufPool;
import io.datakernel.eventloop.AsyncTcpSocket;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.exception.ParseException;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamConsumerWithResult;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamProducerWithResult;
import io.datakernel.util.Preconditions;
import io.datakernel.util.Taggable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/stream/net/MessagingWithBinaryStreaming.class */
public final class MessagingWithBinaryStreaming<I, O> implements AsyncTcpSocket.EventHandler, Messaging<I, O>, Taggable {
    private final AsyncTcpSocket socket;
    private final MessagingSerializer<I, O> serializer;

    @Nullable
    private ByteBuf readBuf;

    @Nullable
    private Callback<I> receiveMessageCallback;
    private boolean readEndOfStream;
    private boolean writeEndOfStreamRequest;
    private SocketStreamProducer socketReader;
    private SocketStreamConsumer socketWriter;
    private Exception closedException;
    private boolean readDone;
    private boolean writeDone;

    @Nullable
    private Object tag;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final Eventloop eventloop = Eventloop.getCurrentEventloop();
    private List<SettableStage<Void>> writeCallbacks = new ArrayList();

    private MessagingWithBinaryStreaming(AsyncTcpSocket asyncTcpSocket, MessagingSerializer<I, O> messagingSerializer) {
        this.socket = asyncTcpSocket;
        this.serializer = messagingSerializer;
    }

    public static <I, O> MessagingWithBinaryStreaming<I, O> create(AsyncTcpSocket asyncTcpSocket, MessagingSerializer<I, O> messagingSerializer) {
        return new MessagingWithBinaryStreaming<>(asyncTcpSocket, messagingSerializer);
    }

    @Override // io.datakernel.stream.net.Messaging
    public Stage<I> receive() {
        Preconditions.checkState(this.socketReader == null, "Cannot try to receive a message while receiving raw binary data");
        Preconditions.checkState(this.receiveMessageCallback == null, "Cannot try to receive a message while already trying to receive a message");
        if (this.closedException != null) {
            return Stage.ofException(this.closedException);
        }
        SettableStage create = SettableStage.create();
        this.receiveMessageCallback = create;
        if (this.readBuf != null || this.readEndOfStream) {
            this.eventloop.post(() -> {
                if (this.socketReader != null || this.receiveMessageCallback == null) {
                    return;
                }
                tryReadMessage();
            });
        } else {
            this.socket.read();
        }
        return create;
    }

    private void tryReadMessage() {
        if (this.readBuf != null && this.receiveMessageCallback != null) {
            try {
                I tryDeserialize = this.serializer.tryDeserialize(this.readBuf);
                if (tryDeserialize == null) {
                    this.socket.read();
                } else {
                    if (!this.readBuf.canRead()) {
                        this.readBuf.recycle();
                        this.readBuf = null;
                        if (!this.readEndOfStream) {
                            this.socket.read();
                        }
                    }
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("received message {}: {}", tryDeserialize, this);
                    }
                    takeReadCallback().set(tryDeserialize);
                }
            } catch (ParseException e) {
                this.logger.warn("error trying to deserialize a message: " + this, e);
                takeReadCallback().setException(e);
            }
        }
        if (this.readBuf == null && this.readEndOfStream && this.receiveMessageCallback != null) {
            this.logger.warn("end of stream reached while trying to read a message: {}", this);
            takeReadCallback().set((Object) null);
        }
    }

    private Callback<I> takeReadCallback() {
        Callback<I> callback = this.receiveMessageCallback;
        this.receiveMessageCallback = null;
        if ($assertionsDisabled || callback != null) {
            return callback;
        }
        throw new AssertionError();
    }

    @Override // io.datakernel.stream.net.Messaging
    public Stage<Void> send(O o) {
        Preconditions.checkState(this.socketWriter == null, "Cannot send messages while sending raw binary data");
        Preconditions.checkState(!this.writeEndOfStreamRequest, "Cannot send messages after end of stream was sent");
        if (this.closedException != null) {
            this.logger.warn("failed to send message " + o + ": " + this, this.closedException);
            return Stage.ofException(this.closedException);
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("sending message {}: {}", o, this);
        }
        SettableStage<Void> create = SettableStage.create();
        this.writeCallbacks.add(create);
        this.socket.write(this.serializer.serialize(o));
        return create;
    }

    @Override // io.datakernel.stream.net.Messaging
    public Stage<Void> sendEndOfStream() {
        Preconditions.checkState(this.socketWriter == null, "Cannot send end of stream while sending raw binary data");
        Preconditions.checkState(!this.writeEndOfStreamRequest, "Cannot send end of stream after end of stream was already sent");
        if (this.closedException != null) {
            this.logger.warn("failed to send end of stream: " + this, this.closedException);
            return Stage.ofException(this.closedException);
        }
        this.logger.trace("sending end of stream: {}", this);
        SettableStage<Void> create = SettableStage.create();
        this.writeEndOfStreamRequest = true;
        this.socket.writeEndOfStream();
        if (!this.writeCallbacks.isEmpty()) {
            this.writeCallbacks.add(create);
            return create;
        }
        this.writeDone = true;
        closeIfDone();
        return Stage.of((Object) null);
    }

    @Override // io.datakernel.stream.net.Messaging
    public StreamConsumerWithResult<ByteBuf, Void> sendBinaryStream() {
        Preconditions.checkState(this.socketWriter == null, "Cannot send raw binary data while already sending raw binary data");
        Preconditions.checkState(!this.writeEndOfStreamRequest, "Cannot send raw binary data after end of stream was sent");
        this.writeCallbacks.clear();
        if (this.closedException != null) {
            this.logger.warn("failed to send binary data: " + this, this.closedException);
            return StreamConsumer.closingWithError(this.closedException).withEndOfStreamAsResult();
        }
        this.logger.trace("sending binary data: {}", this);
        this.socketWriter = SocketStreamConsumer.create(this.socket);
        return this.socketWriter.withResult(this.socketWriter.getSentStage());
    }

    @Override // io.datakernel.stream.net.Messaging
    public StreamProducerWithResult<ByteBuf, Void> receiveBinaryStream() {
        Preconditions.checkState(this.socketReader == null, "Cannot receive raw binary data while already receiving raw binary data");
        Preconditions.checkState(this.receiveMessageCallback == null, "Cannot receive raw binary data while trying to receive a message");
        if (this.closedException != null) {
            this.logger.warn("failed to receive binary data: " + this, this.closedException);
            return StreamProducer.closingWithError(this.closedException).withEndOfStreamAsResult();
        }
        this.logger.trace("receiving binary data: {}", this);
        this.socketReader = SocketStreamProducer.create(this.socket);
        if (this.readBuf != null || this.readEndOfStream) {
            this.eventloop.post(() -> {
                if (this.readBuf != null) {
                    readUnconsumedBuf();
                }
                if (this.readEndOfStream) {
                    this.socketReader.onReadEndOfStream();
                }
            });
        }
        return this.socketReader.withEndOfStreamAsResult();
    }

    @Override // io.datakernel.stream.net.Messaging
    public void close() {
        this.logger.trace("closing: {}", this);
        this.socket.close();
        if (this.readBuf != null) {
            this.readBuf.recycle();
            this.readBuf = null;
        }
    }

    public void onRegistered() {
        this.socket.read();
    }

    private void readUnconsumedBuf() {
        if (!$assertionsDisabled && this.readBuf == null) {
            throw new AssertionError();
        }
        this.socketReader.onRead(this.readBuf);
        this.readBuf = null;
    }

    public void onRead(ByteBuf byteBuf) {
        if (!$assertionsDisabled && !this.eventloop.inEventloopThread()) {
            throw new AssertionError();
        }
        if (this.socketReader != null) {
            if (this.readBuf != null) {
                readUnconsumedBuf();
            }
            this.socketReader.onRead(byteBuf);
        } else {
            if (this.readBuf == null) {
                this.readBuf = ByteBufPool.allocate(Math.max(8192, byteBuf.writeRemaining()));
            }
            this.readBuf = ByteBufPool.append(this.readBuf, byteBuf);
            tryReadMessage();
        }
    }

    public void onReadEndOfStream() {
        this.logger.trace("onShutdownInput: {}", this);
        this.readEndOfStream = true;
        if (this.socketReader == null) {
            tryReadMessage();
        } else {
            if (this.readBuf != null) {
                readUnconsumedBuf();
            }
            this.socketReader.onReadEndOfStream();
        }
        this.readDone = true;
        closeIfDone();
    }

    private void closeIfDone() {
        if (this.readDone && this.writeDone) {
            this.socket.close();
        }
    }

    public void onWrite() {
        if (this.socketWriter == null) {
            List<SettableStage<Void>> list = this.writeCallbacks;
            this.writeCallbacks = new ArrayList();
            Iterator<SettableStage<Void>> it = list.iterator();
            while (it.hasNext()) {
                it.next().set((Object) null);
            }
            if (this.writeEndOfStreamRequest) {
                this.writeDone = true;
            }
        } else {
            this.socketWriter.onWrite();
            if (this.socketWriter.getStatus().isClosed()) {
                this.writeDone = true;
            }
        }
        closeIfDone();
    }

    public void onClosedWithError(Exception exc) {
        this.logger.warn("closing with error: " + this, exc);
        if (this.socketReader != null) {
            this.socketReader.closeWithError(exc);
        } else if (this.socketWriter != null) {
            this.socketWriter.closeWithError(exc);
        } else {
            this.closedException = exc;
        }
        if (this.receiveMessageCallback != null) {
            this.receiveMessageCallback.setException(exc);
        } else if (!this.writeCallbacks.isEmpty()) {
            Iterator<SettableStage<Void>> it = this.writeCallbacks.iterator();
            while (it.hasNext()) {
                it.next().setException(exc);
            }
        }
        if (this.readBuf != null) {
            this.readBuf.recycle();
            this.readBuf = null;
        }
    }

    public void setTag(@Nullable Object obj) {
        this.tag = obj;
    }

    @Nullable
    public Object getTag() {
        return this.tag;
    }

    public String toString() {
        return "Messaging" + (this.tag != null ? '(' + this.tag.toString() + ')' : "{socket=" + this.socket + '}');
    }

    static {
        $assertionsDisabled = !MessagingWithBinaryStreaming.class.desiredAssertionStatus();
    }
}
