package io.camunda.zeebe.backup.s3;

import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Phaser;
import java.util.function.BiFunction;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/backup/s3/AsyncAggregatingSubscriber.class */
public class AsyncAggregatingSubscriber<T> implements Subscriber<CompletableFuture<T>> {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncAggregatingSubscriber.class);
    final ConcurrentLinkedDeque<T> results = new ConcurrentLinkedDeque<>();
    final Phaser phaser = new Phaser(1);
    private Subscription subscription;
    private final long parallelism;

    public AsyncAggregatingSubscriber(long j) {
        this.parallelism = j;
    }

    public void onSubscribe(Subscription subscription) {
        LOG.trace("Subscription started");
        this.phaser.register();
        this.subscription = subscription;
        subscription.request(this.parallelism);
    }

    public void onNext(CompletableFuture<T> completableFuture) {
        LOG.trace("Received next future: {}", completableFuture);
        this.phaser.register();
        completableFuture.handleAsync((BiFunction) (obj, th) -> {
            if (th == null) {
                LOG.trace("Completed: {}", obj);
                this.results.add(obj);
            } else {
                LOG.warn("Future failed, omitted from result", th);
            }
            this.phaser.arrive();
            this.subscription.request(1L);
            return null;
        });
    }

    public void onError(Throwable th) {
        LOG.warn("Subscription failed, result might be incomplete", th);
        this.phaser.forceTermination();
    }

    public void onComplete() {
        LOG.trace("Completed subscription");
        this.phaser.arrive();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Collection<T>> result() {
        Phaser phaser = this.phaser;
        Objects.requireNonNull(phaser);
        return CompletableFuture.supplyAsync(phaser::arriveAndAwaitAdvance).thenApply(num -> {
            LOG.trace("Result is available: {}", this.results);
            return this.results;
        });
    }
}
