package io.activej.crdt;

import io.activej.async.service.ReactiveService;
import io.activej.common.ApplicationSettings;
import io.activej.common.Checks;
import io.activej.common.builder.AbstractBuilder;
import io.activej.common.function.ConsumerEx;
import io.activej.common.function.FunctionEx;
import io.activej.crdt.messaging.CrdtRequest;
import io.activej.crdt.messaging.CrdtResponse;
import io.activej.crdt.storage.ICrdtStorage;
import io.activej.crdt.util.CrdtDataBinarySerializer;
import io.activej.crdt.util.Utils;
import io.activej.csp.binary.codec.ByteBufsCodec;
import io.activej.csp.binary.codec.ByteBufsCodecs;
import io.activej.csp.consumer.ChannelConsumer;
import io.activej.csp.net.Messaging;
import io.activej.csp.supplier.ChannelSupplier;
import io.activej.datastream.consumer.StreamConsumer;
import io.activej.datastream.consumer.StreamConsumers;
import io.activej.datastream.stats.BasicStreamStats;
import io.activej.datastream.stats.DetailedStreamStats;
import io.activej.datastream.stats.StreamStats;
import io.activej.datastream.supplier.StreamSupplier;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.stats.EventStats;
import io.activej.net.socket.tcp.TcpSocket;
import io.activej.promise.Promise;
import io.activej.reactor.AbstractNioReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.jmx.ReactiveJmxBeanWithStats;
import io.activej.reactor.net.SocketSettings;
import io.activej.reactor.nio.NioReactor;
import io.activej.serializer.BinarySerializer;
import java.lang.Comparable;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Objects;

/* loaded from: input_file:io/activej/crdt/RemoteCrdtStorage.class */
public final class RemoteCrdtStorage<K extends Comparable<K>, S> extends AbstractNioReactive implements ICrdtStorage<K, S>, ReactiveService, ReactiveJmxBeanWithStats {
    private static final boolean CHECKS = Checks.isEnabled(RemoteCrdtStorage.class);
    public static final Duration DEFAULT_CONNECT_TIMEOUT = ApplicationSettings.getDuration(RemoteCrdtStorage.class, "connectTimeout", Duration.ZERO);
    public static final Duration DEFAULT_SMOOTHING_WINDOW = ApplicationSettings.getDuration(RemoteCrdtStorage.class, "smoothingWindow", Duration.ofMinutes(1));
    private static final ByteBufsCodec<CrdtResponse, CrdtRequest> SERIALIZER = ByteBufsCodecs.ofStreamCodecs(Utils.CRDT_RESPONSE_CODEC, Utils.CRDT_REQUEST_CODEC);
    private final InetSocketAddress address;
    private final CrdtDataBinarySerializer<K, S> serializer;
    private final BinarySerializer<CrdtTombstone<K>> tombstoneSerializer;
    private long connectTimeoutMillis;
    private SocketSettings socketSettings;
    private boolean detailedStats;
    private final BasicStreamStats<CrdtData<K, S>> uploadStats;
    private final DetailedStreamStats<CrdtData<K, S>> uploadStatsDetailed;
    private final BasicStreamStats<CrdtData<K, S>> downloadStats;
    private final DetailedStreamStats<CrdtData<K, S>> downloadStatsDetailed;
    private final BasicStreamStats<CrdtData<K, S>> takeStats;
    private final DetailedStreamStats<CrdtData<K, S>> takeStatsDetailed;
    private final BasicStreamStats<CrdtTombstone<K>> removeStats;
    private final DetailedStreamStats<CrdtTombstone<K>> removeStatsDetailed;
    private final EventStats uploadedItems;
    private final EventStats downloadedItems;
    private final EventStats takenItems;
    private final EventStats removedItems;

    /* loaded from: input_file:io/activej/crdt/RemoteCrdtStorage$Builder.class */
    public final class Builder extends AbstractBuilder<RemoteCrdtStorage<K, S>.Builder, RemoteCrdtStorage<K, S>> {
        private Builder() {
        }

        public RemoteCrdtStorage<K, S>.Builder withConnectTimeout(Duration duration) {
            checkNotBuilt(this);
            RemoteCrdtStorage.this.connectTimeoutMillis = duration.toMillis();
            return this;
        }

        public RemoteCrdtStorage<K, S>.Builder withSocketSettings(SocketSettings socketSettings) {
            checkNotBuilt(this);
            RemoteCrdtStorage.this.socketSettings = socketSettings;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public RemoteCrdtStorage<K, S> m2doBuild() {
            return RemoteCrdtStorage.this;
        }
    }

    private RemoteCrdtStorage(NioReactor nioReactor, InetSocketAddress inetSocketAddress, CrdtDataBinarySerializer<K, S> crdtDataBinarySerializer) {
        super(nioReactor);
        this.connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT.toMillis();
        this.socketSettings = SocketSettings.defaultInstance();
        this.uploadStats = StreamStats.basic();
        this.uploadStatsDetailed = StreamStats.detailed();
        this.downloadStats = StreamStats.basic();
        this.downloadStatsDetailed = StreamStats.detailed();
        this.takeStats = StreamStats.basic();
        this.takeStatsDetailed = StreamStats.detailed();
        this.removeStats = StreamStats.basic();
        this.removeStatsDetailed = StreamStats.detailed();
        this.uploadedItems = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.downloadedItems = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.takenItems = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.removedItems = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.address = inetSocketAddress;
        this.serializer = crdtDataBinarySerializer;
        this.tombstoneSerializer = crdtDataBinarySerializer.getTombstoneSerializer();
    }

    public static <K extends Comparable<K>, S> RemoteCrdtStorage<K, S> create(NioReactor nioReactor, InetSocketAddress inetSocketAddress, CrdtDataBinarySerializer<K, S> crdtDataBinarySerializer) {
        return (RemoteCrdtStorage) builder(nioReactor, inetSocketAddress, crdtDataBinarySerializer).build();
    }

    public static <K extends Comparable<K>, S> RemoteCrdtStorage<K, S> create(NioReactor nioReactor, InetSocketAddress inetSocketAddress, BinarySerializer<K> binarySerializer, BinarySerializer<S> binarySerializer2) {
        return (RemoteCrdtStorage) builder(nioReactor, inetSocketAddress, binarySerializer, binarySerializer2).build();
    }

    public static <K extends Comparable<K>, S> RemoteCrdtStorage<K, S>.Builder builder(NioReactor nioReactor, InetSocketAddress inetSocketAddress, CrdtDataBinarySerializer<K, S> crdtDataBinarySerializer) {
        return new Builder();
    }

    public static <K extends Comparable<K>, S> RemoteCrdtStorage<K, S>.Builder builder(NioReactor nioReactor, InetSocketAddress inetSocketAddress, BinarySerializer<K> binarySerializer, BinarySerializer<S> binarySerializer2) {
        return new Builder();
    }

    public InetSocketAddress getAddress() {
        return this.address;
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<StreamConsumer<CrdtData<K, S>>> upload() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return connect().then(RemoteCrdtStorage::performHandshake).then(messaging -> {
            Promise map = messaging.send(new CrdtRequest.Upload()).mapException(exc -> {
                return new CrdtException("Failed to send 'Upload' request", exc);
            }).map(r4 -> {
                return messaging.sendBinaryStream().withAcknowledgement(promise -> {
                    Objects.requireNonNull(messaging);
                    return promise.then(messaging::receive).whenResult(validateFn(CrdtResponse.UploadAck.class)).toVoid();
                });
            }).map(channelConsumer -> {
                return StreamConsumers.ofSupplier(streamSupplier -> {
                    StreamSupplier streamSupplier = (StreamSupplier) streamSupplier.transformWith(this.detailedStats ? this.uploadStatsDetailed : this.uploadStats);
                    EventStats eventStats = this.uploadedItems;
                    Objects.requireNonNull(eventStats);
                    return ((ChannelSupplier) ((StreamSupplier) streamSupplier.transformWith(Utils.onItem(eventStats::recordEvent))).transformWith(Utils.createSerializer(this.serializer))).streamTo(channelConsumer);
                }).withAcknowledgement(promise -> {
                    return promise.mapException(exc2 -> {
                        return new CrdtException("Upload failed", exc2);
                    });
                });
            });
            Objects.requireNonNull(messaging);
            return map.whenException(messaging::closeEx);
        });
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<StreamSupplier<CrdtData<K, S>>> download(long j) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return connect().then(RemoteCrdtStorage::performHandshake).then(messaging -> {
            Promise map = messaging.send(new CrdtRequest.Download(j)).mapException(exc -> {
                return new CrdtException("Failed to send 'Download' request", exc);
            }).then(() -> {
                return messaging.receive().mapException(exc2 -> {
                    return new CrdtException("Failed to receive response", exc2);
                });
            }).whenResult(validateFn(CrdtResponse.DownloadStarted.class)).map(crdtResponse -> {
                StreamSupplier streamSupplier = (StreamSupplier) ((StreamSupplier) messaging.receiveBinaryStream().transformWith(Utils.createDeserializer(this.serializer))).transformWith(this.detailedStats ? this.downloadStatsDetailed : this.downloadStats);
                EventStats eventStats = this.downloadedItems;
                Objects.requireNonNull(eventStats);
                return ((StreamSupplier) streamSupplier.transformWith(Utils.onItem(eventStats::recordEvent))).withEndOfStream(promise -> {
                    Objects.requireNonNull(messaging);
                    Promise mapException = promise.then(messaging::sendEndOfStream).mapException(exc2 -> {
                        return new CrdtException("Download failed", exc2);
                    });
                    Objects.requireNonNull(messaging);
                    Promise whenResult = mapException.whenResult(messaging::close);
                    Objects.requireNonNull(messaging);
                    return whenResult.whenException(messaging::closeEx);
                });
            });
            Objects.requireNonNull(messaging);
            return map.whenException(messaging::closeEx);
        });
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<StreamSupplier<CrdtData<K, S>>> take() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return connect().then(RemoteCrdtStorage::performHandshake).then(messaging -> {
            Promise map = messaging.send(new CrdtRequest.Take()).mapException(exc -> {
                return new CrdtException("Failed to send 'Take' request", exc);
            }).then(() -> {
                return messaging.receive().mapException(exc2 -> {
                    return new CrdtException("Failed to receive response", exc2);
                });
            }).whenResult(validateFn(CrdtResponse.TakeStarted.class)).map(crdtResponse -> {
                StreamSupplier streamSupplier = (StreamSupplier) ((StreamSupplier) messaging.receiveBinaryStream().transformWith(Utils.createDeserializer(this.serializer))).transformWith(this.detailedStats ? this.takeStatsDetailed : this.takeStats);
                EventStats eventStats = this.takenItems;
                Objects.requireNonNull(eventStats);
                StreamSupplier streamSupplier2 = (StreamSupplier) streamSupplier.transformWith(Utils.onItem(eventStats::recordEvent));
                Promise then = streamSupplier2.getAcknowledgement().then(() -> {
                    return messaging.send(new CrdtRequest.TakeAck());
                });
                Objects.requireNonNull(messaging);
                Promise mapException = then.then(messaging::sendEndOfStream).mapException(exc2 -> {
                    return new CrdtException("Take failed", exc2);
                });
                Objects.requireNonNull(messaging);
                Promise whenResult = mapException.whenResult(messaging::close);
                Objects.requireNonNull(messaging);
                whenResult.whenException(messaging::closeEx);
                return streamSupplier2;
            });
            Objects.requireNonNull(messaging);
            return map.whenException(messaging::closeEx);
        });
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<StreamConsumer<CrdtTombstone<K>>> remove() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return connect().then(RemoteCrdtStorage::performHandshake).then(messaging -> {
            Promise map = messaging.send(new CrdtRequest.Remove()).mapException(exc -> {
                return new CrdtException("Failed to send 'Remove' request", exc);
            }).map(r5 -> {
                ChannelConsumer withAcknowledgement = messaging.sendBinaryStream().withAcknowledgement(promise -> {
                    Objects.requireNonNull(messaging);
                    return promise.then(messaging::receive).whenResult(validateFn(CrdtResponse.RemoveAck.class)).toVoid();
                });
                return StreamConsumers.ofSupplier(streamSupplier -> {
                    StreamSupplier streamSupplier = (StreamSupplier) streamSupplier.transformWith(this.detailedStats ? this.removeStatsDetailed : this.removeStats);
                    EventStats eventStats = this.removedItems;
                    Objects.requireNonNull(eventStats);
                    return ((ChannelSupplier) ((StreamSupplier) streamSupplier.transformWith(Utils.onItem(eventStats::recordEvent))).transformWith(Utils.createSerializer(this.tombstoneSerializer))).streamTo(withAcknowledgement);
                }).withAcknowledgement(promise2 -> {
                    return promise2.mapException(exc2 -> {
                        return new CrdtException("Remove operation failed", exc2);
                    });
                });
            });
            Objects.requireNonNull(messaging);
            return map.whenException(messaging::closeEx);
        });
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<Void> ping() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return connect().then(RemoteCrdtStorage::performHandshake).then(messaging -> {
            Promise promise = messaging.send(new CrdtRequest.Ping()).mapException(exc -> {
                return new CrdtException("Failed to send 'Ping'", exc);
            }).then(() -> {
                return messaging.receive().mapException(exc2 -> {
                    return new CrdtException("Failed to receive 'Pong'", exc2);
                });
            }).whenResult(validateFn(CrdtResponse.Pong.class)).toVoid();
            Objects.requireNonNull(messaging);
            Promise whenResult = promise.whenResult(messaging::close);
            Objects.requireNonNull(messaging);
            return whenResult.whenException(messaging::closeEx);
        });
    }

    public Promise<?> start() {
        Reactive.checkInReactorThread(this);
        return ping();
    }

    public Promise<?> stop() {
        Reactive.checkInReactorThread(this);
        return Promise.complete();
    }

    private static <T extends CrdtResponse> FunctionEx<CrdtResponse, T> castFn(Class<T> cls) {
        return crdtResponse -> {
            if (crdtResponse instanceof CrdtResponse.ServerError) {
                throw new CrdtException(((CrdtResponse.ServerError) crdtResponse).message());
            }
            if (crdtResponse.getClass() != cls) {
                throw new CrdtException("Received response " + crdtResponse + " instead of " + cls.getName());
            }
            return crdtResponse;
        };
    }

    private static ConsumerEx<CrdtResponse> validateFn(Class<? extends CrdtResponse> cls) {
        return crdtResponse -> {
            castFn(cls).apply(crdtResponse);
        };
    }

    private Promise<Messaging<CrdtResponse, CrdtRequest>> connect() {
        return TcpSocket.connect(this.reactor, this.address, this.connectTimeoutMillis, this.socketSettings).map(tcpSocket -> {
            return Messaging.create(tcpSocket, SERIALIZER);
        }).mapException(exc -> {
            return new CrdtException("Failed to connect to " + this.address, exc);
        });
    }

    private static Promise<Messaging<CrdtResponse, CrdtRequest>> performHandshake(Messaging<CrdtResponse, CrdtRequest> messaging) {
        Promise send = messaging.send(new CrdtRequest.Handshake(CrdtServer.VERSION));
        Objects.requireNonNull(messaging);
        Promise map = send.then(messaging::receive).map(castFn(CrdtResponse.Handshake.class)).map(handshake -> {
            CrdtResponse.HandshakeFailure handshakeFailure = handshake.handshakeFailure();
            if (handshakeFailure != null) {
                throw new CrdtException(String.format("Handshake failed: %s. Minimal allowed version: %s", handshakeFailure.message(), handshakeFailure.minimalVersion()));
            }
            return messaging;
        });
        Objects.requireNonNull(messaging);
        return map.whenException(messaging::closeEx);
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailedStats = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailedStats = false;
    }

    @JmxAttribute
    public BasicStreamStats getUploadStats() {
        return this.uploadStats;
    }

    @JmxAttribute
    public DetailedStreamStats getUploadStatsDetailed() {
        return this.uploadStatsDetailed;
    }

    @JmxAttribute
    public BasicStreamStats getDownloadStats() {
        return this.downloadStats;
    }

    @JmxAttribute
    public DetailedStreamStats getDownloadStatsDetailed() {
        return this.downloadStatsDetailed;
    }

    @JmxAttribute
    public BasicStreamStats getTakeStats() {
        return this.takeStats;
    }

    @JmxAttribute
    public DetailedStreamStats getTakeStatsDetailed() {
        return this.takeStatsDetailed;
    }

    @JmxAttribute
    public BasicStreamStats getRemoveStats() {
        return this.removeStats;
    }

    @JmxAttribute
    public DetailedStreamStats getRemoveStatsDetailed() {
        return this.removeStatsDetailed;
    }

    @JmxAttribute
    public EventStats getUploadedItems() {
        return this.uploadedItems;
    }

    @JmxAttribute
    public EventStats getDownloadedItems() {
        return this.downloadedItems;
    }

    @JmxAttribute
    public EventStats getTakenItems() {
        return this.takenItems;
    }

    @JmxAttribute
    public EventStats getRemovedItems() {
        return this.removedItems;
    }
}
