package hu.akarnokd.reactiverpc;

import hu.akarnokd.reactiverpc.RsRpcProtocol;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntConsumer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import rsc.scheduler.Scheduler;
import rsc.util.UnsignalledExceptions;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:hu/akarnokd/reactiverpc/RpcIOManager.class */
public final class RpcIOManager implements RsRpcProtocol.RsRpcReceive {
    static volatile boolean logMessages = false;
    final Scheduler.Worker reader;
    final Scheduler.Worker writer;
    final InputStream in;
    final OutputStream out;
    final OnNewStream onNew;
    final AtomicLong streamIds;
    volatile boolean closed;
    final Runnable onTerminate;
    final boolean server;
    final byte[] readBuffer;
    final byte[] writeBuffer;
    static final byte PAYLOAD_OBJECT = 0;
    static final byte PAYLOAD_INT = 1;
    static final byte PAYLOAD_LONG = 2;
    static final byte PAYLOAD_STRING = 3;
    static final byte PAYLOAD_BYTES = 4;
    final AtomicBoolean terminateOnce = new AtomicBoolean();
    final ConcurrentMap<Long, Object> streams = new ConcurrentHashMap();

    @FunctionalInterface
    /* loaded from: input_file:hu/akarnokd/reactiverpc/RpcIOManager$OnNewStream.class */
    public interface OnNewStream {
        boolean onNew(long j, String str, RpcIOManager rpcIOManager);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:hu/akarnokd/reactiverpc/RpcIOManager$OnNextTask.class */
    public static final class OnNextTask implements Runnable, IntConsumer {
        final long streamId;
        final OutputStream out;
        final byte[] writeBuffer;
        final boolean server;
        final Object object;
        byte[] payload;
        int flags;

        public OnNextTask(long j, OutputStream outputStream, byte[] bArr, boolean z, Object obj) {
            this.streamId = j;
            this.out = outputStream;
            this.server = z;
            this.writeBuffer = bArr;
            this.object = obj;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (RpcIOManager.logMessages) {
                PrintStream printStream = System.out;
                Object[] objArr = new Object[3];
                objArr[RpcIOManager.PAYLOAD_OBJECT] = this.server ? "server" : "client";
                objArr[1] = Long.valueOf(this.streamId);
                objArr[2] = this.object;
                printStream.printf("%s/sendNext/%d/%s%n", objArr);
            }
            RsRpcProtocol.next(this.out, this.streamId, this.flags, this.payload, this.writeBuffer);
            RpcIOManager.flush(this.out);
        }

        @Override // java.util.function.IntConsumer
        public void accept(int i) {
            this.flags = i;
        }
    }

    public RpcIOManager(Scheduler.Worker worker, InputStream inputStream, Scheduler.Worker worker2, OutputStream outputStream, OnNewStream onNewStream, Runnable runnable, boolean z) {
        this.reader = worker;
        this.writer = worker2;
        this.in = inputStream;
        this.out = outputStream;
        this.onNew = onNewStream;
        this.onTerminate = runnable;
        this.streamIds = new AtomicLong((z ? Long.MIN_VALUE : 0L) + 1);
        this.server = z;
        this.readBuffer = new byte[16];
        this.writeBuffer = new byte[32];
    }

    public void start() {
        this.reader.schedule(this::handleRead);
    }

    public void close() {
        this.closed = true;
        try {
            this.in.close();
        } catch (IOException e) {
            UnsignalledExceptions.onErrorDropped(e);
        }
        try {
            this.out.close();
        } catch (IOException e2) {
            UnsignalledExceptions.onErrorDropped(e2);
        }
        this.reader.shutdown();
        this.writer.shutdown();
    }

    void handleRead() {
        while (!Thread.currentThread().isInterrupted() && !this.closed && RsRpcProtocol.receive(this.in, this.readBuffer, this)) {
        }
    }

    public long newStreamId() {
        return this.streamIds.getAndIncrement();
    }

    public void registerSubscription(long j, Subscription subscription) {
        if (this.streams.putIfAbsent(Long.valueOf(j), subscription) != null) {
            throw new IllegalStateException("StreamID " + j + " already registered");
        }
    }

    public void registerSubscriber(long j, Subscriber<?> subscriber) {
        if (this.streams.putIfAbsent(Long.valueOf(j), subscriber) != null) {
            throw new IllegalStateException("StreamID " + j + " already registered");
        }
    }

    @Override // hu.akarnokd.reactiverpc.RsRpcProtocol.RsRpcReceive
    public void onNew(long j, String str) {
        if (logMessages) {
            PrintStream printStream = System.out;
            Object[] objArr = new Object[3];
            objArr[PAYLOAD_OBJECT] = this.server ? "server" : "client";
            objArr[1] = Long.valueOf(j);
            objArr[2] = str;
            printStream.printf("%s/onNew/%d/%s%n", objArr);
        }
        if (this.onNew.onNew(j, str, this)) {
            return;
        }
        this.writer.schedule(() -> {
            if (logMessages) {
                PrintStream printStream2 = System.out;
                Object[] objArr2 = new Object[3];
                objArr2[PAYLOAD_OBJECT] = this.server ? "server" : "client";
                objArr2[1] = Long.valueOf(j);
                objArr2[2] = "New stream(" + str + ") rejected";
                printStream2.printf("%s/onNew/%d/%s%n", objArr2);
            }
            RsRpcProtocol.cancel(this.out, j, "New stream(" + str + ") rejected", this.writeBuffer);
        });
    }

    @Override // hu.akarnokd.reactiverpc.RsRpcProtocol.RsRpcReceive
    public void onCancel(long j, String str) {
        if (logMessages) {
            PrintStream printStream = System.out;
            Object[] objArr = new Object[3];
            objArr[PAYLOAD_OBJECT] = this.server ? "server" : "client";
            objArr[1] = Long.valueOf(j);
            objArr[2] = str;
            printStream.printf("%s/onCancel/%d/%s%n", objArr);
        }
        Object obj = this.streams.get(Long.valueOf(j));
        if (obj != null) {
            if (obj instanceof Subscription) {
                ((Subscription) obj).cancel();
            } else {
                UnsignalledExceptions.onErrorDropped(new IllegalStateException("Stream " + j + " directed at wrong receiver: " + obj.getClass()));
            }
        }
    }

    @Override // hu.akarnokd.reactiverpc.RsRpcProtocol.RsRpcReceive
    public void onNext(long j, int i, byte[] bArr, int i2, int i3) {
        if (logMessages) {
            PrintStream printStream = System.out;
            Object[] objArr = new Object[4];
            objArr[PAYLOAD_OBJECT] = this.server ? "server" : "client";
            objArr[1] = Long.valueOf(j);
            objArr[2] = Integer.valueOf(bArr.length);
            objArr[3] = Integer.valueOf(i3);
            printStream.printf("%s/onNext/%d/len=%d/%d%n", objArr);
        }
        Object obj = this.streams.get(Long.valueOf(j));
        if (obj instanceof Subscriber) {
            Subscriber subscriber = (Subscriber) obj;
            if (i2 != i3) {
                subscriber.onError(new IOException("Partial value received: expected = " + bArr.length + ", actual = " + i3));
                return;
            }
            try {
                Object decode = decode(i, bArr, i2);
                if (logMessages) {
                    PrintStream printStream2 = System.out;
                    Object[] objArr2 = new Object[3];
                    objArr2[PAYLOAD_OBJECT] = this.server ? "server" : "client";
                    objArr2[1] = Long.valueOf(j);
                    objArr2[2] = decode;
                    printStream2.printf("%s/onNext/%d/value=%s%n", objArr2);
                }
                try {
                    subscriber.onNext(decode);
                } catch (Throwable th) {
                    sendCancel(j, th.toString());
                    subscriber.onError(th);
                }
            } catch (IOException | ClassNotFoundException e) {
                sendCancel(j, e.toString());
                subscriber.onError(e);
            }
        }
    }

    Object decode(int i, byte[] bArr, int i2) throws IOException, ClassNotFoundException {
        if (i == 1) {
            return Integer.valueOf((bArr[PAYLOAD_OBJECT] & 255) | ((bArr[1] & 255) << 8) | ((bArr[2] & 255) << 16) | ((bArr[3] & 255) << 24));
        }
        if (i == 2) {
            return Long.valueOf((bArr[PAYLOAD_OBJECT] & 255) | ((bArr[1] & 255) << 8) | ((bArr[2] & 255) << 16) | ((bArr[3] & 255) << 24) | ((bArr[4] & 255) << 32) | ((bArr[5] & 255) << 40) | ((bArr[6] & 255) << 48) | ((bArr[7] & 255) << 56));
        }
        if (i == 3) {
            return RpcHelper.readUtf8(bArr, PAYLOAD_OBJECT, i2);
        }
        if (i != 4) {
            return new ObjectInputStream(new ByteArrayInputStream(bArr)).readObject();
        }
        if (bArr != this.readBuffer) {
            return bArr;
        }
        byte[] bArr2 = new byte[i2];
        System.arraycopy(bArr, PAYLOAD_OBJECT, bArr2, PAYLOAD_OBJECT, i2);
        return bArr2;
    }

    byte[] encode(Object obj, IntConsumer intConsumer) throws IOException {
        if (obj instanceof Integer) {
            intConsumer.accept(1);
            int intValue = ((Integer) obj).intValue();
            return new byte[]{(byte) (intValue & 255), (byte) ((intValue >> 8) & 255), (byte) ((intValue >> 16) & 255), (byte) ((intValue >> 24) & 255)};
        }
        if (obj instanceof Long) {
            intConsumer.accept(2);
            long longValue = ((Long) obj).longValue();
            return new byte[]{(byte) (longValue & 255), (byte) ((longValue >> 8) & 255), (byte) ((longValue >> 16) & 255), (byte) ((longValue >> 24) & 255), (byte) ((longValue >> 32) & 255), (byte) ((longValue >> 40) & 255), (byte) ((longValue >> 48) & 255), (byte) ((longValue >> 56) & 255)};
        }
        if (obj instanceof String) {
            intConsumer.accept(3);
            return RsRpcProtocol.utf8((String) obj);
        }
        if (obj instanceof byte[]) {
            intConsumer.accept(4);
            return (byte[]) ((byte[]) obj).clone();
        }
        intConsumer.accept(PAYLOAD_OBJECT);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
        Throwable th = PAYLOAD_OBJECT;
        try {
            try {
                objectOutputStream.writeObject(obj);
                if (objectOutputStream != null) {
                    if (th != null) {
                        try {
                            objectOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        objectOutputStream.close();
                    }
                }
                return byteArrayOutputStream.toByteArray();
            } finally {
            }
        } catch (Throwable th3) {
            if (objectOutputStream != null) {
                if (th != null) {
                    try {
                        objectOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    objectOutputStream.close();
                }
            }
            throw th3;
        }
    }

    @Override // hu.akarnokd.reactiverpc.RsRpcProtocol.RsRpcReceive
    public void onError(long j, String str) {
        if (logMessages) {
            PrintStream printStream = System.out;
            Object[] objArr = new Object[3];
            objArr[PAYLOAD_OBJECT] = this.server ? "server" : "client";
            objArr[1] = Long.valueOf(j);
            objArr[2] = str;
            printStream.printf("%s/onError/%d/%s%n", objArr);
        }
        if (j > 0) {
            Object obj = this.streams.get(Long.valueOf(j));
            if (obj instanceof Subscriber) {
                ((Subscriber) obj).onError(new Exception(str));
                return;
            }
        }
        if (j < 0) {
            if (this.terminateOnce.compareAndSet(false, true)) {
                this.onTerminate.run();
            }
            if (this.closed) {
                return;
            }
        }
        UnsignalledExceptions.onErrorDropped(new Exception(str));
    }

    @Override // hu.akarnokd.reactiverpc.RsRpcProtocol.RsRpcReceive
    public void onComplete(long j) {
        if (logMessages) {
            PrintStream printStream = System.out;
            Object[] objArr = new Object[2];
            objArr[PAYLOAD_OBJECT] = this.server ? "server" : "client";
            objArr[1] = Long.valueOf(j);
            printStream.printf("%s/onComplete/%d%n", objArr);
        }
        Object obj = this.streams.get(Long.valueOf(j));
        if (obj instanceof Subscriber) {
            ((Subscriber) obj).onComplete();
        }
    }

    @Override // hu.akarnokd.reactiverpc.RsRpcProtocol.RsRpcReceive
    public void onRequested(long j, long j2) {
        if (logMessages) {
            PrintStream printStream = System.out;
            Object[] objArr = new Object[3];
            objArr[PAYLOAD_OBJECT] = this.server ? "server" : "client";
            objArr[1] = Long.valueOf(j);
            objArr[2] = Long.valueOf(j2);
            printStream.printf("%s/onRequested/%d/%d%n", objArr);
        }
        Object obj = this.streams.get(Long.valueOf(j));
        if (obj instanceof Subscription) {
            ((Subscription) obj).request(j2);
        }
    }

    @Override // hu.akarnokd.reactiverpc.RsRpcProtocol.RsRpcReceive
    public void onUnknown(int i, int i2, long j, byte[] bArr, int i3) {
        if (logMessages) {
            PrintStream printStream = System.out;
            Object[] objArr = new Object[4];
            objArr[PAYLOAD_OBJECT] = this.server ? "server" : "client";
            objArr[1] = Long.valueOf(j);
            objArr[2] = Integer.valueOf(bArr.length);
            objArr[3] = Integer.valueOf(i3);
            printStream.printf("%s/onUnknown/%d/len=%d/%d%n", objArr);
        }
    }

    static void flush(OutputStream outputStream) {
        try {
            outputStream.flush();
        } catch (IOException e) {
            UnsignalledExceptions.onErrorDropped(e);
        }
    }

    public void sendNew(long j, String str) {
        this.writer.schedule(() -> {
            if (logMessages) {
                PrintStream printStream = System.out;
                Object[] objArr = new Object[3];
                objArr[PAYLOAD_OBJECT] = this.server ? "server" : "client";
                objArr[1] = Long.valueOf(j);
                objArr[2] = str;
                printStream.printf("%s/sendNew/%d/%s%n", objArr);
            }
            RsRpcProtocol.open(this.out, j, str, this.writeBuffer);
            flush(this.out);
        });
    }

    public void sendNext(long j, Object obj) throws IOException {
        OnNextTask onNextTask = new OnNextTask(j, this.out, this.writeBuffer, this.server, logMessages ? obj : null);
        onNextTask.payload = encode(obj, onNextTask);
        this.writer.schedule(onNextTask);
    }

    public void sendError(long j, Throwable th) {
        this.writer.schedule(() -> {
            if (logMessages) {
                th.printStackTrace();
                PrintStream printStream = System.out;
                Object[] objArr = new Object[3];
                objArr[PAYLOAD_OBJECT] = this.server ? "server" : "client";
                objArr[1] = Long.valueOf(j);
                objArr[2] = th;
                printStream.printf("%s/sendError/%d/%s%n", objArr);
            }
            RsRpcProtocol.error(this.out, j, th, this.writeBuffer);
            flush(this.out);
        });
    }

    public void sendComplete(long j) {
        this.writer.schedule(() -> {
            if (logMessages) {
                PrintStream printStream = System.out;
                Object[] objArr = new Object[2];
                objArr[PAYLOAD_OBJECT] = this.server ? "server" : "client";
                objArr[1] = Long.valueOf(j);
                printStream.printf("%s/sendComplete/%d%n", objArr);
            }
            RsRpcProtocol.complete(this.out, j, this.writeBuffer);
            flush(this.out);
        });
    }

    public void sendCancel(long j, String str) {
        this.writer.schedule(() -> {
            if (logMessages) {
                PrintStream printStream = System.out;
                Object[] objArr = new Object[3];
                objArr[PAYLOAD_OBJECT] = this.server ? "server" : "client";
                objArr[1] = Long.valueOf(j);
                objArr[2] = str;
                printStream.printf("%s/sendCancel/%d/%s%n", objArr);
            }
            RsRpcProtocol.cancel(this.out, j, str, this.writeBuffer);
            flush(this.out);
        });
    }

    public void sendRequested(long j, long j2) {
        this.writer.schedule(() -> {
            if (logMessages) {
                PrintStream printStream = System.out;
                Object[] objArr = new Object[3];
                objArr[PAYLOAD_OBJECT] = this.server ? "server" : "client";
                objArr[1] = Long.valueOf(j);
                objArr[2] = Long.valueOf(j2);
                printStream.printf("%s/sendRequested/%d/%d%n", objArr);
            }
            RsRpcProtocol.request(this.out, j, j2, this.writeBuffer);
            flush(this.out);
        });
    }

    public void deregister(long j) {
        if (this.streams.remove(Long.valueOf(j)) == null) {
        }
    }
}
