package hu.akarnokd.reactive.rpc;

import hu.akarnokd.reactive.pc.RsPcReceive;
import hu.akarnokd.reactive.pc.RsPcSend;
import hu.akarnokd.reactive.rpc.RsRpcProtocol;
import io.reactivex.Scheduler;
import io.reactivex.plugins.RxJavaPlugins;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntConsumer;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:hu/akarnokd/reactive/rpc/RpcIOManager.class */
public final class RpcIOManager implements RsRpcProtocol.RsRpcReceive, RsPcSend {
    final Scheduler.Worker reader;
    final Scheduler.Worker writer;
    final InputStream in;
    final OutputStream out;
    volatile boolean closed;
    final boolean server;
    final RsPcReceive receive;
    final AtomicBoolean terminateOnce = new AtomicBoolean();
    final byte[] readBuffer = new byte[16];
    final byte[] writeBuffer = new byte[32];

    /* loaded from: input_file:hu/akarnokd/reactive/rpc/RpcIOManager$OnNextTask.class */
    static final class OnNextTask implements Runnable, IntConsumer {
        final long streamId;
        final OutputStream out;
        final byte[] writeBuffer;
        final boolean server;
        final Object object;
        byte[] payload;
        int flags;

        public OnNextTask(long j, OutputStream outputStream, byte[] bArr, boolean z, Object obj) {
            this.streamId = j;
            this.out = outputStream;
            this.server = z;
            this.writeBuffer = bArr;
            this.object = obj;
        }

        @Override // java.lang.Runnable
        public void run() {
            RsRpcProtocol.next(this.out, this.streamId, this.flags, this.payload, this.writeBuffer);
            RpcIOManager.flush(this.out);
        }

        @Override // java.util.function.IntConsumer
        public void accept(int i) {
            this.flags = i;
        }
    }

    public RpcIOManager(Scheduler.Worker worker, InputStream inputStream, Scheduler.Worker worker2, OutputStream outputStream, RsPcReceive rsPcReceive, boolean z) {
        this.reader = worker;
        this.writer = worker2;
        this.in = inputStream;
        this.out = outputStream;
        this.server = z;
        this.receive = rsPcReceive;
    }

    public void start() {
        this.reader.schedule(this::handleRead);
    }

    public void close() {
        this.closed = true;
        try {
            this.in.close();
        } catch (IOException e) {
            RxJavaPlugins.onError(e);
        }
        try {
            this.out.close();
        } catch (IOException e2) {
            RxJavaPlugins.onError(e2);
        }
        this.reader.dispose();
        this.writer.dispose();
    }

    void handleRead() {
        while (!Thread.currentThread().isInterrupted() && !this.closed && RsRpcProtocol.receive(this.in, this.readBuffer, this)) {
        }
    }

    @Override // hu.akarnokd.reactive.rpc.RsRpcProtocol.RsRpcReceive
    public void onNew(long j, String str) {
        this.receive.onNew(j, str);
    }

    @Override // hu.akarnokd.reactive.rpc.RsRpcProtocol.RsRpcReceive
    public void onCancel(long j, String str) {
        this.receive.onCancel(j, str);
    }

    @Override // hu.akarnokd.reactive.rpc.RsRpcProtocol.RsRpcReceive
    public void onNext(long j, int i, byte[] bArr, int i2, int i3) {
        if (i2 != i3) {
            this.receive.onError(j, new IOException("Partial value received: expected = " + bArr.length + ", actual = " + i3));
            return;
        }
        try {
            try {
                this.receive.onNext(j, decode(i, bArr, i2));
            } catch (Throwable th) {
                sendCancel(j, th.toString());
                this.receive.onError(j, th);
            }
        } catch (IOException | ClassNotFoundException e) {
            sendCancel(j, e.toString());
            this.receive.onError(j, e);
        }
    }

    Object decode(int i, byte[] bArr, int i2) throws IOException, ClassNotFoundException {
        if (i == 1) {
            return Integer.valueOf((bArr[0] & 255) | ((bArr[1] & 255) << 8) | ((bArr[2] & 255) << 16) | ((bArr[3] & 255) << 24));
        }
        if (i == 2) {
            return Long.valueOf((bArr[0] & 255) | ((bArr[1] & 255) << 8) | ((bArr[2] & 255) << 16) | ((bArr[3] & 255) << 24) | ((bArr[4] & 255) << 32) | ((bArr[5] & 255) << 40) | ((bArr[6] & 255) << 48) | ((bArr[7] & 255) << 56));
        }
        if (i == 3) {
            return RpcHelper.readUtf8(bArr, 0, i2);
        }
        if (i != 4) {
            return new ObjectInputStream(new ByteArrayInputStream(bArr)).readObject();
        }
        if (bArr != this.readBuffer) {
            return bArr;
        }
        byte[] bArr2 = new byte[i2];
        System.arraycopy(bArr, 0, bArr2, 0, i2);
        return bArr2;
    }

    byte[] encode(Object obj, IntConsumer intConsumer) throws IOException {
        if (obj instanceof Integer) {
            intConsumer.accept(1);
            int intValue = ((Integer) obj).intValue();
            return new byte[]{(byte) (intValue & 255), (byte) ((intValue >> 8) & 255), (byte) ((intValue >> 16) & 255), (byte) ((intValue >> 24) & 255)};
        }
        if (obj instanceof Long) {
            intConsumer.accept(2);
            long longValue = ((Long) obj).longValue();
            return new byte[]{(byte) (longValue & 255), (byte) ((longValue >> 8) & 255), (byte) ((longValue >> 16) & 255), (byte) ((longValue >> 24) & 255), (byte) ((longValue >> 32) & 255), (byte) ((longValue >> 40) & 255), (byte) ((longValue >> 48) & 255), (byte) ((longValue >> 56) & 255)};
        }
        if (obj instanceof String) {
            intConsumer.accept(3);
            return RsRpcProtocol.utf8((String) obj);
        }
        if (obj instanceof byte[]) {
            intConsumer.accept(4);
            return (byte[]) ((byte[]) obj).clone();
        }
        intConsumer.accept(0);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
        Throwable th = null;
        try {
            try {
                objectOutputStream.writeObject(obj);
                if (objectOutputStream != null) {
                    if (0 != 0) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        objectOutputStream.close();
                    }
                }
                return byteArrayOutputStream.toByteArray();
            } finally {
            }
        } catch (Throwable th3) {
            if (objectOutputStream != null) {
                if (th != null) {
                    try {
                        objectOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    objectOutputStream.close();
                }
            }
            throw th3;
        }
    }

    @Override // hu.akarnokd.reactive.rpc.RsRpcProtocol.RsRpcReceive
    public void onError(long j, String str) {
        this.receive.onError(j, str);
    }

    @Override // hu.akarnokd.reactive.rpc.RsRpcProtocol.RsRpcReceive
    public void onComplete(long j) {
        this.receive.onComplete(j);
    }

    @Override // hu.akarnokd.reactive.rpc.RsRpcProtocol.RsRpcReceive
    public void onRequested(long j, long j2) {
        this.receive.onRequested(j, j2);
    }

    @Override // hu.akarnokd.reactive.rpc.RsRpcProtocol.RsRpcReceive
    public void onUnknown(int i, int i2, long j, byte[] bArr, int i3) {
    }

    static void flush(OutputStream outputStream) {
        try {
            outputStream.flush();
        } catch (IOException e) {
            RxJavaPlugins.onError(e);
        }
    }

    @Override // hu.akarnokd.reactive.pc.RsPcSend
    public void sendNew(long j, String str) {
        this.writer.schedule(() -> {
            RsRpcProtocol.open(this.out, j, str, this.writeBuffer);
            flush(this.out);
        });
    }

    @Override // hu.akarnokd.reactive.pc.RsPcSend
    public void sendNext(long j, Object obj) throws IOException {
        OnNextTask onNextTask = new OnNextTask(j, this.out, this.writeBuffer, this.server, null);
        onNextTask.payload = encode(obj, onNextTask);
        this.writer.schedule(onNextTask);
    }

    @Override // hu.akarnokd.reactive.pc.RsPcSend
    public void sendError(long j, Throwable th) {
        this.writer.schedule(() -> {
            RsRpcProtocol.error(this.out, j, th, this.writeBuffer);
            flush(this.out);
        });
    }

    @Override // hu.akarnokd.reactive.pc.RsPcSend
    public void sendComplete(long j) {
        this.writer.schedule(() -> {
            RsRpcProtocol.complete(this.out, j, this.writeBuffer);
            flush(this.out);
        });
    }

    @Override // hu.akarnokd.reactive.pc.RsPcSend
    public void sendCancel(long j, String str) {
        this.writer.schedule(() -> {
            RsRpcProtocol.cancel(this.out, j, str, this.writeBuffer);
            flush(this.out);
        });
    }

    @Override // hu.akarnokd.reactive.pc.RsPcSend
    public void sendRequested(long j, long j2) {
        this.writer.schedule(() -> {
            RsRpcProtocol.request(this.out, j, j2, this.writeBuffer);
            flush(this.out);
        });
    }

    @Override // hu.akarnokd.reactive.pc.RsPcSend
    public boolean isClosed() {
        return this.closed;
    }
}
