package io.streamnative.oxia.client;

import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.DeleteOption;
import io.streamnative.oxia.client.api.GetResult;
import io.streamnative.oxia.client.api.Notification;
import io.streamnative.oxia.client.api.PutOption;
import io.streamnative.oxia.client.api.PutResult;
import io.streamnative.oxia.client.batch.BatchManager;
import io.streamnative.oxia.client.batch.Operation;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.grpc.OxiaStubManager;
import io.streamnative.oxia.client.metrics.BatchMetrics;
import io.streamnative.oxia.client.metrics.OperationMetrics;
import io.streamnative.oxia.client.notify.NotificationManager;
import io.streamnative.oxia.client.session.SessionManager;
import io.streamnative.oxia.client.shard.ShardManager;
import io.streamnative.oxia.proto.ListRequest;
import java.time.Clock;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import reactor.core.publisher.Flux;

/* loaded from: input_file:io/streamnative/oxia/client/AsyncOxiaClientImpl.class */
class AsyncOxiaClientImpl implements AsyncOxiaClient {

    @NonNull
    private final OxiaStubManager stubManager;

    @NonNull
    private final ShardManager shardManager;

    @NonNull
    private final NotificationManager notificationManager;

    @NonNull
    private final BatchManager readBatchManager;

    @NonNull
    private final BatchManager writeBatchManager;

    @NonNull
    private final SessionManager sessionManager;

    @NonNull
    private final OperationMetrics metrics;
    private final AtomicLong sequence = new AtomicLong();
    private volatile boolean closed;

    /* JADX INFO: Access modifiers changed from: package-private */
    @NonNull
    public static CompletableFuture<AsyncOxiaClient> newInstance(@NonNull ClientConfig clientConfig) {
        if (clientConfig == null) {
            throw new NullPointerException("config is marked non-null but is null");
        }
        OxiaStubManager oxiaStubManager = new OxiaStubManager();
        ShardManager shardManager = new ShardManager(oxiaStubManager.getStub(clientConfig.serviceAddress()), clientConfig.metrics(), clientConfig.namespace());
        NotificationManager notificationManager = new NotificationManager(oxiaStubManager, shardManager, clientConfig.metrics());
        Objects.requireNonNull(shardManager);
        Function function = (v1) -> {
            return r0.leader(v1);
        };
        Objects.requireNonNull(oxiaStubManager);
        Function andThen = function.andThen(oxiaStubManager::getStub);
        shardManager.addCallback(notificationManager);
        BatchMetrics create = BatchMetrics.create(Clock.systemUTC(), clientConfig.metrics());
        BatchManager newReadBatchManager = BatchManager.newReadBatchManager(clientConfig, andThen, create);
        SessionManager sessionManager = new SessionManager(clientConfig, andThen);
        shardManager.addCallback(sessionManager);
        AsyncOxiaClientImpl asyncOxiaClientImpl = new AsyncOxiaClientImpl(oxiaStubManager, shardManager, notificationManager, newReadBatchManager, BatchManager.newWriteBatchManager(clientConfig, andThen, sessionManager, create), sessionManager, OperationMetrics.create(Clock.systemUTC(), clientConfig.metrics()));
        return shardManager.start().thenApply(r3 -> {
            return asyncOxiaClientImpl;
        });
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    @NonNull
    public CompletableFuture<PutResult> put(String str, byte[] bArr, PutOption... putOptionArr) {
        OperationMetrics.Sample<PutResult> recordPut = this.metrics.recordPut(bArr == null ? 0L : bArr.length);
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            checkIfClosed();
            Objects.requireNonNull(str);
            Objects.requireNonNull(bArr);
            Set<PutOption> validate = PutOption.validate(putOptionArr);
            this.writeBatchManager.getBatcher(this.shardManager.get(str)).add(new Operation.WriteOperation.PutOperation(this.sequence.getAndIncrement(), completableFuture, str, bArr, PutOption.toVersionId(validate), PutOption.toEphemeral(validate)));
        } catch (RuntimeException e) {
            completableFuture.completeExceptionally(e);
        }
        Objects.requireNonNull(recordPut);
        return completableFuture.whenComplete((v1, v2) -> {
            r1.stop(v1, v2);
        });
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    @NonNull
    public CompletableFuture<Boolean> delete(String str, DeleteOption... deleteOptionArr) {
        OperationMetrics.Sample<Boolean> recordDelete = this.metrics.recordDelete();
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            checkIfClosed();
            Objects.requireNonNull(str);
            Set<DeleteOption> validate = DeleteOption.validate(deleteOptionArr);
            this.writeBatchManager.getBatcher(this.shardManager.get(str)).add(new Operation.WriteOperation.DeleteOperation(this.sequence.getAndIncrement(), completableFuture, str, DeleteOption.toVersionId(validate)));
        } catch (RuntimeException e) {
            completableFuture.completeExceptionally(e);
        }
        Objects.requireNonNull(recordDelete);
        return completableFuture.whenComplete((v1, v2) -> {
            r1.stop(v1, v2);
        });
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    @NonNull
    public CompletableFuture<Void> deleteRange(String str, String str2) {
        CompletableFuture<Void> failedFuture;
        OperationMetrics.Sample<Void> recordDeleteRange = this.metrics.recordDeleteRange();
        try {
            checkIfClosed();
            Objects.requireNonNull(str);
            Objects.requireNonNull(str2);
            Stream<Long> stream = this.shardManager.getAll().stream();
            BatchManager batchManager = this.writeBatchManager;
            Objects.requireNonNull(batchManager);
            failedFuture = CompletableFuture.allOf((CompletableFuture[]) ((List) stream.map((v1) -> {
                return r1.getBatcher(v1);
            }).map(batcher -> {
                CompletableFuture completableFuture = new CompletableFuture();
                batcher.add(new Operation.WriteOperation.DeleteRangeOperation(this.sequence.getAndIncrement(), completableFuture, str, str2));
                return completableFuture;
            }).collect(Collectors.toList())).toArray(new CompletableFuture[0]));
        } catch (RuntimeException e) {
            failedFuture = CompletableFuture.failedFuture(e);
        }
        Objects.requireNonNull(recordDeleteRange);
        return failedFuture.whenComplete((v1, v2) -> {
            r1.stop(v1, v2);
        });
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    @NonNull
    public CompletableFuture<GetResult> get(String str) {
        OperationMetrics.Sample<GetResult> recordGet = this.metrics.recordGet();
        CompletableFuture completableFuture = new CompletableFuture();
        try {
            checkIfClosed();
            Objects.requireNonNull(str);
            this.readBatchManager.getBatcher(this.shardManager.get(str)).add(new Operation.ReadOperation.GetOperation(this.sequence.getAndIncrement(), completableFuture, str));
        } catch (RuntimeException e) {
            completableFuture.completeExceptionally(e);
        }
        Objects.requireNonNull(recordGet);
        return completableFuture.whenComplete((v1, v2) -> {
            r1.stop(v1, v2);
        });
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    @NonNull
    public CompletableFuture<List<String>> list(String str, String str2) {
        CompletableFuture failedFuture;
        OperationMetrics.Sample<List<String>> recordList = this.metrics.recordList();
        try {
            checkIfClosed();
            Objects.requireNonNull(str);
            Objects.requireNonNull(str2);
            failedFuture = Flux.fromIterable(this.shardManager.getAll()).flatMap(l -> {
                return list(l.longValue(), str, str2);
            }).collectList().toFuture();
        } catch (Exception e) {
            failedFuture = CompletableFuture.failedFuture(e);
        }
        Objects.requireNonNull(recordList);
        return failedFuture.whenComplete((v1, v2) -> {
            r1.stop(v1, v2);
        });
    }

    @Override // io.streamnative.oxia.client.api.AsyncOxiaClient
    public void notifications(@NonNull Consumer<Notification> consumer) {
        if (consumer == null) {
            throw new NullPointerException("notificationCallback is marked non-null but is null");
        }
        checkIfClosed();
        this.notificationManager.registerCallback(consumer);
    }

    @NonNull
    private Flux<String> list(long j, @NonNull String str, @NonNull String str2) {
        if (str == null) {
            throw new NullPointerException("startKeyInclusive is marked non-null but is null");
        }
        if (str2 == null) {
            throw new NullPointerException("endKeyExclusive is marked non-null but is null");
        }
        OxiaStub stub = this.stubManager.getStub(this.shardManager.leader(j));
        return stub.reactor().list(ListRequest.newBuilder().setShardId(j).setStartInclusive(str).setEndExclusive(str2).build()).flatMapIterable((v0) -> {
            return v0.getKeysList();
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.readBatchManager.close();
        this.writeBatchManager.close();
        this.sessionManager.close();
        this.notificationManager.close();
        this.shardManager.close();
        this.stubManager.close();
    }

    private void checkIfClosed() {
        if (this.closed) {
            throw new IllegalStateException("Client has been closed");
        }
    }

    AsyncOxiaClientImpl(@NonNull OxiaStubManager oxiaStubManager, @NonNull ShardManager shardManager, @NonNull NotificationManager notificationManager, @NonNull BatchManager batchManager, @NonNull BatchManager batchManager2, @NonNull SessionManager sessionManager, @NonNull OperationMetrics operationMetrics) {
        if (oxiaStubManager == null) {
            throw new NullPointerException("stubManager is marked non-null but is null");
        }
        if (shardManager == null) {
            throw new NullPointerException("shardManager is marked non-null but is null");
        }
        if (notificationManager == null) {
            throw new NullPointerException("notificationManager is marked non-null but is null");
        }
        if (batchManager == null) {
            throw new NullPointerException("readBatchManager is marked non-null but is null");
        }
        if (batchManager2 == null) {
            throw new NullPointerException("writeBatchManager is marked non-null but is null");
        }
        if (sessionManager == null) {
            throw new NullPointerException("sessionManager is marked non-null but is null");
        }
        if (operationMetrics == null) {
            throw new NullPointerException("metrics is marked non-null but is null");
        }
        this.stubManager = oxiaStubManager;
        this.shardManager = shardManager;
        this.notificationManager = notificationManager;
        this.readBatchManager = batchManager;
        this.writeBatchManager = batchManager2;
        this.sessionManager = sessionManager;
        this.metrics = operationMetrics;
    }
}
