/*
 * Decompiled with CFR 0.152.
 */
package io.activej.dataflow;

import io.activej.async.process.AsyncCloseable;
import io.activej.bytebuf.ByteBuf;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.binary.ByteBufsCodec;
import io.activej.csp.dsl.ChannelSupplierTransformer;
import io.activej.csp.dsl.ChannelTransformer;
import io.activej.csp.net.Messaging;
import io.activej.csp.net.MessagingWithBinaryStreaming;
import io.activej.csp.queue.ChannelBuffer;
import io.activej.csp.queue.ChannelBufferWithFallback;
import io.activej.csp.queue.ChannelFileBuffer;
import io.activej.csp.queue.ChannelQueue;
import io.activej.csp.queue.ChannelZeroBuffer;
import io.activej.dataflow.command.DataflowCommand;
import io.activej.dataflow.command.DataflowCommandDownload;
import io.activej.dataflow.command.DataflowCommandExecute;
import io.activej.dataflow.command.DataflowResponse;
import io.activej.dataflow.command.DataflowResponseResult;
import io.activej.dataflow.graph.StreamId;
import io.activej.dataflow.inject.BinarySerializerModule;
import io.activej.dataflow.node.Node;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.csp.ChannelDeserializer;
import io.activej.datastream.processor.StreamSupplierTransformer;
import io.activej.eventloop.net.SocketSettings;
import io.activej.net.socket.tcp.AsyncTcpSocket;
import io.activej.net.socket.tcp.AsyncTcpSocketNio;
import io.activej.promise.Promise;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class DataflowClient {
    private static final Logger logger = LoggerFactory.getLogger(DataflowClient.class);
    private final SocketSettings socketSettings = SocketSettings.createDefault();
    private final Executor executor;
    private final Path secondaryPath;
    private final ByteBufsCodec<DataflowResponse, DataflowCommand> codec;
    private final BinarySerializerModule.BinarySerializerLocator serializers;
    private final AtomicInteger secondaryId = new AtomicInteger(ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE));
    private int bufferMinSize;
    private int bufferMaxSize;

    public DataflowClient(Executor executor, Path secondaryPath, ByteBufsCodec<DataflowResponse, DataflowCommand> codec, BinarySerializerModule.BinarySerializerLocator serializers) {
        this.executor = executor;
        this.secondaryPath = secondaryPath;
        this.codec = codec;
        this.serializers = serializers;
    }

    public DataflowClient withBufferSizes(int bufferMinSize, int bufferMaxSize) {
        this.bufferMinSize = bufferMinSize;
        this.bufferMaxSize = bufferMaxSize;
        return this;
    }

    public <T> StreamSupplier<T> download(InetSocketAddress address, StreamId streamId, Class<T> type, ChannelTransformer<ByteBuf, ByteBuf> transformer) {
        return StreamSupplier.ofPromise((Promise)AsyncTcpSocketNio.connect((InetSocketAddress)address, (long)0L, (SocketSettings)this.socketSettings).then(socket -> {
            MessagingWithBinaryStreaming messaging = MessagingWithBinaryStreaming.create((AsyncTcpSocket)socket, this.codec);
            return messaging.send((Object)new DataflowCommandDownload(streamId)).map(arg_0 -> this.lambda$download$2((Messaging)messaging, transformer, type, streamId, address, arg_0));
        }));
    }

    public <T> StreamSupplier<T> download(InetSocketAddress address, StreamId streamId, Class<T> type) {
        return this.download(address, streamId, type, (ChannelTransformer<ByteBuf, ByteBuf>)ChannelTransformer.identity());
    }

    public Promise<Session> connect(InetSocketAddress address) {
        return AsyncTcpSocketNio.connect((InetSocketAddress)address, (long)0L, (SocketSettings)this.socketSettings).map(socket -> new Session(address, (AsyncTcpSocketNio)socket));
    }

    private /* synthetic */ StreamSupplier lambda$download$2(Messaging messaging, ChannelTransformer transformer, Class type, StreamId streamId, InetSocketAddress address, Void $) {
        ChannelBuffer primaryBuffer = this.bufferMinSize == 0 && this.bufferMaxSize == 0 ? new ChannelZeroBuffer() : new ChannelBuffer(this.bufferMinSize, this.bufferMaxSize);
        ChannelBufferWithFallback buffer = new ChannelBufferWithFallback((ChannelQueue)primaryBuffer, () -> ChannelFileBuffer.create((Executor)this.executor, (Path)this.secondaryPath.resolve(this.secondaryId.getAndIncrement() + ".bin")));
        return ((StreamSupplier)((StreamSupplier)((ChannelSupplier)((ChannelSupplier)messaging.receiveBinaryStream().transformWith((ChannelSupplierTransformer)transformer)).transformWith((ChannelSupplierTransformer)buffer)).transformWith((ChannelSupplierTransformer)ChannelDeserializer.create(this.serializers.get(type)).withExplicitEndOfStream())).transformWith(new StreamTraceCounter(streamId, address))).withEndOfStream(eos -> eos.whenComplete(() -> ((Messaging)messaging).close()));
    }

    public class Session
    implements AsyncCloseable {
        private final InetSocketAddress address;
        private final Messaging<DataflowResponse, DataflowCommand> messaging;

        private Session(InetSocketAddress address, AsyncTcpSocketNio socket) {
            this.address = address;
            this.messaging = MessagingWithBinaryStreaming.create((AsyncTcpSocket)socket, (ByteBufsCodec)DataflowClient.this.codec);
        }

        public Promise<Void> execute(long taskId, Collection<Node> nodes) {
            return this.messaging.send((Object)new DataflowCommandExecute(taskId, new ArrayList<Node>(nodes))).then(() -> this.messaging.receive()).then(response -> {
                this.messaging.close();
                if (!(response instanceof DataflowResponseResult)) {
                    return Promise.ofException((Throwable)new Exception("Bad response from server"));
                }
                String error = ((DataflowResponseResult)response).getError();
                if (error != null) {
                    return Promise.ofException((Throwable)new Exception("Error on remote server " + this.address + ": " + error));
                }
                return Promise.complete();
            });
        }

        public void closeEx(@NotNull Throwable e) {
            this.messaging.closeEx(e);
        }
    }

    private static class StreamTraceCounter<T>
    implements StreamSupplierTransformer<T, StreamSupplier<T>> {
        private final StreamId streamId;
        private final InetSocketAddress address;
        private int count = 0;
        private final Input input;
        private final Output output;

        private StreamTraceCounter(StreamId streamId, InetSocketAddress address) {
            this.streamId = streamId;
            this.address = address;
            this.input = new Input();
            this.output = new Output();
            this.input.getAcknowledgement().whenException(arg_0 -> ((Output)this.output).closeEx(arg_0));
            this.output.getAcknowledgement().whenResult(() -> ((Input)this.input).acknowledge()).whenException(arg_0 -> ((Input)this.input).closeEx(arg_0));
        }

        public StreamSupplier<T> transform(StreamSupplier<T> supplier) {
            supplier.streamTo((StreamConsumer)this.input);
            return this.output;
        }

        private final class Input
        extends AbstractStreamConsumer<T> {
            private Input() {
            }

            protected void onEndOfStream() {
                StreamTraceCounter.this.output.sendEndOfStream();
            }

            protected void onComplete() {
                logger.info("Received {} items total from stream {}({})", new Object[]{StreamTraceCounter.this.count, StreamTraceCounter.this.streamId, StreamTraceCounter.this.address});
            }
        }

        private final class Output
        extends AbstractStreamSupplier<T> {
            private Output() {
            }

            protected void onResumed() {
                StreamDataAcceptor dataAcceptor = this.getDataAcceptor();
                assert (dataAcceptor != null);
                StreamTraceCounter.this.input.resume(item -> {
                    if (++StreamTraceCounter.this.count == 1 || StreamTraceCounter.this.count % 1000 == 0) {
                        logger.info("Received {} items from stream {}({}): {}", new Object[]{StreamTraceCounter.this.count, StreamTraceCounter.this.streamId, StreamTraceCounter.this.address, item});
                    }
                    dataAcceptor.accept(item);
                });
            }

            protected void onSuspended() {
                StreamTraceCounter.this.input.suspend();
            }
        }
    }
}

