/*
 * Decompiled with CFR 0.152.
 */
package io.datakernel.stream.net;

import io.datakernel.async.Callback;
import io.datakernel.async.SettableStage;
import io.datakernel.async.Stage;
import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.bytebuf.ByteBufPool;
import io.datakernel.eventloop.AsyncTcpSocket;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.exception.ParseException;
import io.datakernel.stream.StreamConsumer;
import io.datakernel.stream.StreamConsumerWithResult;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamProducerWithResult;
import io.datakernel.stream.net.Messaging;
import io.datakernel.stream.net.MessagingSerializer;
import io.datakernel.stream.net.SocketStreamConsumer;
import io.datakernel.stream.net.SocketStreamProducer;
import io.datakernel.util.Preconditions;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MessagingWithBinaryStreaming<I, O>
implements AsyncTcpSocket.EventHandler,
Messaging<I, O> {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final Eventloop eventloop = Eventloop.getCurrentEventloop();
    private final AsyncTcpSocket asyncTcpSocket;
    private final MessagingSerializer<I, O> serializer;
    private ByteBuf readBuf;
    private boolean readEndOfStream;
    private Callback<I> receiveMessageCallback;
    private List<SettableStage<Void>> writeCallbacks = new ArrayList<SettableStage<Void>>();
    private boolean writeEndOfStreamRequest;
    private SocketStreamProducer socketReader;
    private SocketStreamConsumer socketWriter;
    private Exception closedException;
    private boolean readDone;
    private boolean writeDone;

    private MessagingWithBinaryStreaming(AsyncTcpSocket asyncTcpSocket, MessagingSerializer<I, O> serializer) {
        this.asyncTcpSocket = asyncTcpSocket;
        this.serializer = serializer;
    }

    public static <I, O> MessagingWithBinaryStreaming<I, O> create(AsyncTcpSocket asyncTcpSocket, MessagingSerializer<I, O> serializer) {
        return new MessagingWithBinaryStreaming<I, O>(asyncTcpSocket, serializer);
    }

    @Override
    public Stage<I> receive() {
        SettableStage result;
        Preconditions.checkState((this.socketReader == null && this.receiveMessageCallback == null ? 1 : 0) != 0);
        if (this.closedException != null) {
            return Stage.ofException((Throwable)this.closedException);
        }
        this.receiveMessageCallback = result = SettableStage.create();
        if (this.readBuf != null || this.readEndOfStream) {
            this.eventloop.post(() -> {
                if (this.socketReader == null && this.receiveMessageCallback != null) {
                    this.tryReadMessage();
                }
            });
        } else {
            this.asyncTcpSocket.read();
        }
        return result;
    }

    private void tryReadMessage() {
        if (this.readBuf != null && this.receiveMessageCallback != null) {
            try {
                I message = this.serializer.tryDeserialize(this.readBuf);
                if (message == null) {
                    this.asyncTcpSocket.read();
                } else {
                    if (!this.readBuf.canRead()) {
                        this.readBuf.recycle();
                        this.readBuf = null;
                        if (!this.readEndOfStream) {
                            this.asyncTcpSocket.read();
                        }
                    }
                    this.takeReadCallback().set(message);
                }
            }
            catch (ParseException e) {
                this.takeReadCallback().setException((Throwable)e);
            }
        }
        if (this.readBuf == null && this.readEndOfStream && this.receiveMessageCallback != null) {
            this.takeReadCallback().set(null);
        }
    }

    private Callback<I> takeReadCallback() {
        Callback<I> callback = this.receiveMessageCallback;
        this.receiveMessageCallback = null;
        return callback;
    }

    @Override
    public Stage<Void> send(O msg) {
        Preconditions.checkState((this.socketWriter == null && !this.writeEndOfStreamRequest ? 1 : 0) != 0);
        if (this.closedException != null) {
            return Stage.ofException((Throwable)this.closedException);
        }
        SettableStage stage = SettableStage.create();
        this.writeCallbacks.add((SettableStage<Void>)stage);
        ByteBuf buf = this.serializer.serialize(msg);
        this.asyncTcpSocket.write(buf);
        return stage;
    }

    @Override
    public Stage<Void> sendEndOfStream() {
        Preconditions.checkState((this.socketWriter == null && !this.writeEndOfStreamRequest ? 1 : 0) != 0);
        if (this.closedException != null) {
            return Stage.ofException((Throwable)this.closedException);
        }
        SettableStage stage = SettableStage.create();
        this.writeEndOfStreamRequest = true;
        this.writeCallbacks.add((SettableStage<Void>)stage);
        this.asyncTcpSocket.writeEndOfStream();
        return stage;
    }

    @Override
    public StreamConsumerWithResult<ByteBuf, Void> sendBinaryStream() {
        Preconditions.checkState((this.socketWriter == null && !this.writeEndOfStreamRequest ? 1 : 0) != 0);
        this.writeCallbacks.clear();
        if (this.closedException != null) {
            return StreamConsumer.closingWithError(this.closedException).withEndOfStreamAsResult();
        }
        this.socketWriter = SocketStreamConsumer.create(this.asyncTcpSocket);
        return this.socketWriter.withResult(this.socketWriter.getSentStage());
    }

    @Override
    public StreamProducerWithResult<ByteBuf, Void> receiveBinaryStream() {
        Preconditions.checkState((this.socketReader == null && this.receiveMessageCallback == null ? 1 : 0) != 0);
        if (this.closedException != null) {
            StreamProducer producer = StreamProducer.closingWithError(this.closedException);
            return producer.withEndOfStreamAsResult();
        }
        this.socketReader = SocketStreamProducer.create(this.asyncTcpSocket);
        if (this.readBuf != null || this.readEndOfStream) {
            this.eventloop.post(() -> {
                if (this.readBuf != null) {
                    this.readUnconsumedBuf();
                }
                if (this.readEndOfStream) {
                    this.socketReader.onReadEndOfStream();
                }
            });
        }
        return this.socketReader.withEndOfStreamAsResult();
    }

    @Override
    public void close() {
        this.asyncTcpSocket.close();
        if (this.readBuf != null) {
            this.readBuf.recycle();
            this.readBuf = null;
        }
    }

    public void onRegistered() {
        this.asyncTcpSocket.read();
    }

    private void readUnconsumedBuf() {
        assert (this.readBuf != null);
        this.socketReader.onRead(this.readBuf);
        this.readBuf = null;
    }

    public void onRead(ByteBuf buf) {
        this.logger.trace("onRead", (Object)this);
        assert (this.eventloop.inEventloopThread());
        if (this.socketReader == null) {
            if (this.readBuf == null) {
                this.readBuf = ByteBufPool.allocate((int)Math.max(8192, buf.writeRemaining()));
            }
            this.readBuf = ByteBufPool.append((ByteBuf)this.readBuf, (ByteBuf)buf);
            this.tryReadMessage();
        } else {
            if (this.readBuf != null) {
                this.readUnconsumedBuf();
            }
            this.socketReader.onRead(buf);
        }
    }

    public void onReadEndOfStream() {
        this.logger.trace("onShutdownInput", (Object)this);
        this.readEndOfStream = true;
        if (this.socketReader == null) {
            this.tryReadMessage();
        } else {
            if (this.readBuf != null) {
                this.readUnconsumedBuf();
            }
            this.socketReader.onReadEndOfStream();
        }
        this.readDone = true;
        this.closeIfDone();
    }

    private void closeIfDone() {
        if (this.readDone && this.writeDone) {
            this.asyncTcpSocket.close();
        }
    }

    public void onWrite() {
        this.logger.trace("onWrite", (Object)this);
        if (this.socketWriter == null) {
            List<SettableStage<Void>> callbacks = this.writeCallbacks;
            this.writeCallbacks = new ArrayList<SettableStage<Void>>();
            for (SettableStage<Void> callback : callbacks) {
                callback.set(null);
            }
            if (this.writeEndOfStreamRequest) {
                this.writeDone = true;
            }
        } else {
            this.socketWriter.onWrite();
            if (this.socketWriter.getStatus().isClosed()) {
                this.writeDone = true;
            }
        }
        this.closeIfDone();
    }

    public void onClosedWithError(Exception e) {
        this.logger.trace("onClosedWithError", (Object)this);
        if (this.socketReader != null) {
            this.socketReader.closeWithError(e);
        } else if (this.socketWriter != null) {
            this.socketWriter.closeWithError(e);
        } else {
            this.closedException = e;
        }
        if (this.receiveMessageCallback != null) {
            this.receiveMessageCallback.setException((Throwable)e);
        } else if (!this.writeCallbacks.isEmpty()) {
            for (SettableStage<Void> writeCallback : this.writeCallbacks) {
                writeCallback.setException((Throwable)e);
            }
        }
        if (this.readBuf != null) {
            this.readBuf.recycle();
            this.readBuf = null;
        }
    }

    public String toString() {
        return "{asyncTcpSocket=" + this.asyncTcpSocket + "}";
    }
}

