package cz.o2.proxima.direct.pubsub;

import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.LogObserver;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.commitlog.ObserverUtils;
import cz.o2.proxima.direct.commitlog.Offset;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.Partition;
import cz.o2.proxima.direct.pubsub.proto.PubSub;
import cz.o2.proxima.functional.UnaryFunction;
import cz.o2.proxima.pubsub.shaded.com.google.api.gax.rpc.AlreadyExistsException;
import cz.o2.proxima.pubsub.shaded.com.google.cloud.pubsub.v1.AckReplyConsumer;
import cz.o2.proxima.pubsub.shaded.com.google.cloud.pubsub.v1.MessageReceiver;
import cz.o2.proxima.pubsub.shaded.com.google.cloud.pubsub.v1.Subscriber;
import cz.o2.proxima.pubsub.shaded.com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import cz.o2.proxima.pubsub.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.pubsub.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.pubsub.shaded.com.google.common.collect.Iterables;
import cz.o2.proxima.pubsub.shaded.com.google.common.collect.Lists;
import cz.o2.proxima.pubsub.shaded.com.google.protobuf.FieldMask;
import cz.o2.proxima.pubsub.shaded.com.google.protobuf.InvalidProtocolBufferException;
import cz.o2.proxima.pubsub.shaded.com.google.pubsub.v1.ProjectSubscriptionName;
import cz.o2.proxima.pubsub.shaded.com.google.pubsub.v1.ProjectTopicName;
import cz.o2.proxima.pubsub.shaded.com.google.pubsub.v1.PubsubMessage;
import cz.o2.proxima.pubsub.shaded.com.google.pubsub.v1.PushConfig;
import cz.o2.proxima.pubsub.shaded.com.google.pubsub.v1.Subscription;
import cz.o2.proxima.pubsub.shaded.com.google.pubsub.v1.UpdateSubscriptionRequest;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.AbstractStorage;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.time.WatermarkEstimator;
import cz.o2.proxima.time.WatermarkSupplier;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.bp.Duration;

/* loaded from: input_file:cz/o2/proxima/direct/pubsub/PubSubReader.class */
class PubSubReader extends AbstractStorage implements CommitLogReader {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PubSubReader.class);
    private final Context context;
    private final String project;
    private final String topic;
    private final int maxAckDeadline;
    private final int subscriptionAckDeadline;
    private final boolean subscriptionAutoCreate;
    private final long watermarkEstimateDuration;
    private final long allowedTimestampSkew;
    private transient ExecutorService executor;

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:cz/o2/proxima/direct/pubsub/PubSubReader$PubSubConsumer.class */
    public interface PubSubConsumer extends Serializable {
        boolean consume(StreamElement streamElement, WatermarkSupplier watermarkSupplier, AckReplyConsumer ackReplyConsumer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:cz/o2/proxima/direct/pubsub/PubSubReader$PubSubOffset.class */
    public static class PubSubOffset implements Offset {
        private final String consumerName;
        private final long watermark;

        PubSubOffset(String str, long j) {
            this.consumerName = (String) Objects.requireNonNull(str);
            this.watermark = j;
        }

        public Partition getPartition() {
            return new PubSubPartition(this.consumerName);
        }

        public String toString() {
            return "PubSubOffset(consumerName=" + this.consumerName + ", watermark=" + this.watermark + ")";
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof PubSubOffset)) {
                return false;
            }
            PubSubOffset pubSubOffset = (PubSubOffset) obj;
            return pubSubOffset.consumerName.equals(this.consumerName) && pubSubOffset.watermark == this.watermark;
        }

        public int hashCode() {
            return Objects.hash(this.consumerName, Long.valueOf(this.watermark));
        }

        public String getConsumerName() {
            return this.consumerName;
        }

        public long getWatermark() {
            return this.watermark;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/o2/proxima/direct/pubsub/PubSubReader$PubSubPartition.class */
    public static final class PubSubPartition implements Partition {
        private final String consumerName;

        PubSubPartition(String str) {
            this.consumerName = (String) Objects.requireNonNull(str);
        }

        public int getId() {
            return 0;
        }

        public boolean isSplittable() {
            return true;
        }

        public Collection<Partition> split(int i) {
            PubSubReader.log.info("Splitting partition {} into {} parts", this, Integer.valueOf(i));
            return (Collection) IntStream.range(0, i).mapToObj(i2 -> {
                return this;
            }).collect(Collectors.toList());
        }

        public String toString() {
            return "PubSubPartition(" + this.consumerName + ")";
        }

        public String getConsumerName() {
            return this.consumerName;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PubSubReader(PubSubAccessor pubSubAccessor, Context context) {
        super(pubSubAccessor.getEntityDescriptor(), pubSubAccessor.getUri());
        this.context = context;
        this.project = pubSubAccessor.getProject();
        this.topic = pubSubAccessor.getTopic();
        this.maxAckDeadline = pubSubAccessor.getMaxAckDeadline();
        this.subscriptionAckDeadline = pubSubAccessor.getSubscriptionAckDeadline();
        this.subscriptionAutoCreate = pubSubAccessor.isSubscriptionAutoCreate();
        this.watermarkEstimateDuration = pubSubAccessor.getWatermarkEstimateDuration();
        this.allowedTimestampSkew = pubSubAccessor.getAllowedTimestampSkew();
    }

    public List<Partition> getPartitions() {
        return Arrays.asList(new PubSubPartition(asConsumerName(null)));
    }

    public ObserveHandle observe(@Nullable String str, Position position, LogObserver logObserver) {
        return observe(str, position, Long.MIN_VALUE, logObserver);
    }

    private ObserveHandle observe(@Nullable String str, Position position, long j, LogObserver logObserver) {
        validatePosition(position);
        String asConsumerName = asConsumerName(str);
        AtomicLong atomicLong = new AtomicLong(j);
        PubSubConsumer pubSubConsumer = (streamElement, watermarkSupplier, ackReplyConsumer) -> {
            LogObserver.OffsetCommitter offsetCommitter = (z, th) -> {
                if (z) {
                    log.debug("Confirming message {} to PubSub", streamElement);
                    atomicLong.set(watermarkSupplier.getWatermark());
                    ackReplyConsumer.ack();
                } else {
                    if (th != null) {
                        log.warn("Error during processing of {}", streamElement, th);
                    } else {
                        log.info("Nacking message {} by request", streamElement);
                    }
                    ackReplyConsumer.nack();
                }
            };
            try {
                boolean onNext = logObserver.onNext(streamElement, ObserverUtils.asOnNextContext(offsetCommitter, new PubSubOffset(asConsumerName, watermarkSupplier.getWatermark())));
                if (!onNext) {
                    logObserver.onCompleted();
                }
                return onNext;
            } catch (Exception e) {
                log.error("Error calling onNext", (Throwable) e);
                offsetCommitter.fail(e);
                throw new RuntimeException(e);
            }
        };
        Objects.requireNonNull(logObserver);
        UnaryFunction<Throwable, Boolean> unaryFunction = logObserver::onError;
        Runnable runnable = () -> {
        };
        Objects.requireNonNull(logObserver);
        return consume(asConsumerName, pubSubConsumer, unaryFunction, null, runnable, logObserver::onCancelled, atomicLong);
    }

    public ObserveHandle observePartitions(@Nullable String str, Collection<Partition> collection, Position position, boolean z, LogObserver logObserver) {
        validateNotStopAtCurrent(z);
        return observe(findConsumerFromPartitions(str, collection), position, logObserver);
    }

    public ObserveHandle observeBulk(@Nullable String str, Position position, boolean z, LogObserver logObserver) {
        return observeBulk(str, position, z, Long.MIN_VALUE, logObserver);
    }

    private ObserveHandle observeBulk(@Nullable String str, Position position, boolean z, long j, LogObserver logObserver) {
        validateNotStopAtCurrent(z);
        validatePosition(position);
        AtomicReference atomicReference = new AtomicReference(new ArrayList());
        Object obj = new Object();
        Object obj2 = new Object();
        AtomicLong atomicLong = new AtomicLong();
        String asConsumerName = asConsumerName(str);
        AtomicLong atomicLong2 = new AtomicLong(j);
        PubSubPartition pubSubPartition = new PubSubPartition(asConsumerName);
        PubSubConsumer pubSubConsumer = (streamElement, watermarkSupplier, ackReplyConsumer) -> {
            long size;
            synchronized (obj2) {
                ((List) atomicReference.get()).add(ackReplyConsumer);
                size = r0.size() + atomicLong.get();
            }
            LogObserver.OffsetCommitter createBulkCommitter = createBulkCommitter(obj2, size, atomicLong, atomicReference, watermarkSupplier, atomicLong2);
            synchronized (obj) {
                try {
                    if (logObserver.onNext(streamElement, ObserverUtils.asOnNextContext(createBulkCommitter, new PubSubOffset(asConsumerName, watermarkSupplier.getWatermark())))) {
                        return true;
                    }
                    logObserver.onCompleted();
                    return false;
                } catch (Exception e) {
                    log.error("Error calling on next", (Throwable) e);
                    createBulkCommitter.fail(e);
                    throw new RuntimeException(e);
                }
            }
        };
        Objects.requireNonNull(logObserver);
        UnaryFunction<Throwable, Boolean> unaryFunction = logObserver::onError;
        Runnable runnable = () -> {
            logObserver.onRepartition(ObserverUtils.asRepartitionContext(Arrays.asList(pubSubPartition)));
        };
        Runnable runnable2 = () -> {
            logObserver.onRepartition(ObserverUtils.asRepartitionContext(Arrays.asList(pubSubPartition)));
        };
        Objects.requireNonNull(logObserver);
        return consume(asConsumerName, pubSubConsumer, unaryFunction, runnable, runnable2, logObserver::onCancelled, atomicLong2);
    }

    private LogObserver.OffsetCommitter createBulkCommitter(Object obj, long j, AtomicLong atomicLong, AtomicReference<List<AckReplyConsumer>> atomicReference, WatermarkSupplier watermarkSupplier, AtomicLong atomicLong2) {
        return (z, th) -> {
            Consumer consumer;
            synchronized (obj) {
                int i = (int) (j - atomicLong.get());
                if (i > 0) {
                    if (z) {
                        log.debug("Bulk confirming {} messages", Integer.valueOf(i));
                        consumer = (v0) -> {
                            v0.ack();
                        };
                        atomicLong2.set(watermarkSupplier.getWatermark());
                    } else {
                        if (th != null) {
                            log.warn("Error during processing of last bulk", th);
                        } else {
                            log.info("Nacking last bulk by request");
                        }
                        consumer = (v0) -> {
                            v0.nack();
                        };
                    }
                    List list = (List) atomicReference.get();
                    for (int i2 = 0; i2 < i; i2++) {
                        consumer.accept((AckReplyConsumer) list.get(i2));
                    }
                    atomicLong.addAndGet(i);
                    atomicReference.set(Lists.newArrayList(list.subList(i, list.size())));
                }
            }
        };
    }

    public ObserveHandle observeBulkPartitions(@Nullable String str, Collection<Partition> collection, Position position, boolean z, LogObserver logObserver) {
        return observeBulkWithMinWatermark(findConsumerFromPartitions(str, collection), position, z, Long.MIN_VALUE, logObserver);
    }

    private ObserveHandle observeBulkWithMinWatermark(@Nullable String str, Position position, boolean z, long j, LogObserver logObserver) {
        validateNotStopAtCurrent(z);
        return observeBulk(str, position, false, j, logObserver);
    }

    public ObserveHandle observeBulkOffsets(Collection<Offset> collection, LogObserver logObserver) {
        List list = (List) collection.stream().map(offset -> {
            return ((PubSubOffset) offset).getConsumerName();
        }).distinct().collect(Collectors.toList());
        Preconditions.checkArgument(list.size() == 1, "Offsets should be reading same consumer, got %s", list);
        return observeBulkWithMinWatermark(asConsumerName((String) Iterables.getOnlyElement(list)), Position.NEWEST, false, collection.stream().mapToLong(offset2 -> {
            return ((PubSubOffset) offset2).getWatermark();
        }).min().orElse(Long.MIN_VALUE), logObserver);
    }

    @VisibleForTesting
    Subscriber newSubscriber(ProjectSubscriptionName projectSubscriptionName, MessageReceiver messageReceiver) {
        if (this.subscriptionAutoCreate) {
            try {
                SubscriptionAdminClient create = SubscriptionAdminClient.create();
                Throwable th = null;
                try {
                    createSubscription(create, projectSubscriptionName);
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } finally {
                }
            } catch (IOException e) {
                log.error("Failed to close SubscriptionAdminClient", (Throwable) e);
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }
        return Subscriber.newBuilder(projectSubscriptionName, messageReceiver).setMaxAckExtensionPeriod(Duration.ofMillis(this.maxAckDeadline)).build();
    }

    @VisibleForTesting
    WatermarkEstimator createWatermarkEstimator(long j) {
        return WatermarkEstimator.newBuilder().withMinWatermark(j).withDurationMs((this.watermarkEstimateDuration / 100) * 100).withAllowedTimestampSkew(this.allowedTimestampSkew).withStepMs(100L).build();
    }

    private void createSubscription(SubscriptionAdminClient subscriptionAdminClient, ProjectSubscriptionName projectSubscriptionName) {
        try {
            ProjectTopicName of = ProjectTopicName.of(this.project, this.topic);
            subscriptionAdminClient.createSubscription(projectSubscriptionName, of, PushConfig.newBuilder().build(), this.subscriptionAckDeadline);
            log.info("Automatically creating subscription {} for topic {} with ackDeadline {} as requested", projectSubscriptionName, of, Integer.valueOf(this.subscriptionAckDeadline));
        } catch (AlreadyExistsException e) {
            Subscription subscription = subscriptionAdminClient.getSubscription(projectSubscriptionName);
            if (!subscription.getTopic().equals(ProjectTopicName.of(this.project, this.topic).toString())) {
                throw new IllegalStateException("Existed subscription " + projectSubscriptionName.getSubscription() + " use topic " + subscription.getTopic() + " which is different than configured " + ProjectTopicName.of(this.project, this.topic) + ".");
            }
            if (subscription.getAckDeadlineSeconds() == this.subscriptionAckDeadline) {
                log.debug("Subscription {} already exists. Skipping creation.", projectSubscriptionName);
            } else {
                subscriptionAdminClient.updateSubscription(UpdateSubscriptionRequest.newBuilder().setUpdateMask(FieldMask.newBuilder().addPaths("ack_deadline_seconds").build()).setSubscription(Subscription.newBuilder().setAckDeadlineSeconds(this.subscriptionAckDeadline).setName(projectSubscriptionName.toString()).build()).build());
                log.info("Subscription ack deadline {} for subscription {} was different than configured: {}. Subscription updated.", Integer.valueOf(subscription.getAckDeadlineSeconds()), projectSubscriptionName, Integer.valueOf(this.subscriptionAckDeadline));
            }
        }
    }

    private void validatePosition(Position position) {
        if (position == Position.OLDEST) {
            failUnsupported();
        }
    }

    private void validateNotStopAtCurrent(boolean z) {
        if (z) {
            failUnsupported();
        }
    }

    private void failUnsupported() {
        throw new UnsupportedOperationException("PubSub can observe only current data.");
    }

    private String asConsumerName(String str) {
        return str != null ? str : "unnamed-consumer-" + UUID.randomUUID().toString();
    }

    private ObserveHandle consume(final String str, PubSubConsumer pubSubConsumer, UnaryFunction<Throwable, Boolean> unaryFunction, @Nullable Runnable runnable, Runnable runnable2, final Runnable runnable3, final AtomicLong atomicLong) {
        ProjectSubscriptionName of = ProjectSubscriptionName.of(this.project, str);
        final AtomicReference<Subscriber> atomicReference = new AtomicReference<>();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicReference<MessageReceiver> atomicReference2 = new AtomicReference<>();
        atomicReference2.set(createMessageReceiver(of, atomicReference, atomicBoolean, pubSubConsumer, createWatermarkEstimator(atomicLong.get()), unaryFunction, runnable2, atomicReference2));
        atomicReference.set(newSubscriber(of, atomicReference2.get()));
        atomicReference.get().startAsync();
        if (runnable != null) {
            executor().submit(() -> {
                ((Subscriber) atomicReference.get()).awaitRunning();
                if (runnable != null) {
                    runnable.run();
                }
            });
        }
        return new ObserveHandle() { // from class: cz.o2.proxima.direct.pubsub.PubSubReader.1
            public void close() {
                PubSubReader.log.debug("Cancelling observer {}", str);
                atomicBoolean.set(true);
                Subscriber stopAsync = PubSubReader.this.stopAsync(atomicReference);
                if (stopAsync != null) {
                    stopAsync.awaitTerminated();
                }
                runnable3.run();
            }

            public List<Offset> getCommittedOffsets() {
                return Arrays.asList(new PubSubOffset(str, atomicLong.get()));
            }

            public void resetOffsets(List<Offset> list) {
            }

            public List<Offset> getCurrentOffsets() {
                return getCommittedOffsets();
            }

            public void waitUntilReady() throws InterruptedException {
                ((Subscriber) atomicReference.get()).awaitRunning();
            }
        };
    }

    private MessageReceiver createMessageReceiver(ProjectSubscriptionName projectSubscriptionName, AtomicReference<Subscriber> atomicReference, AtomicBoolean atomicBoolean, PubSubConsumer pubSubConsumer, WatermarkEstimator watermarkEstimator, UnaryFunction<Throwable, Boolean> unaryFunction, Runnable runnable, AtomicReference<MessageReceiver> atomicReference2) {
        return (pubsubMessage, ackReplyConsumer) -> {
            try {
                log.trace("Received message {}", pubsubMessage);
                if (atomicBoolean.get()) {
                    log.debug("Returning rejected message {}", pubsubMessage);
                    ackReplyConsumer.nack();
                    return;
                }
                Optional<StreamElement> element = toElement(getEntityDescriptor(), pubsubMessage);
                if (element.isPresent()) {
                    long watermark = watermarkEstimator.getWatermark();
                    watermarkEstimator.add(element.get().getStamp());
                    if (watermarkEstimator.getWatermark() < watermark) {
                        log.warn("Element {} is moving watermark backwards of {} ms. If this happens too often, then it is likely you need to extend ack deadline.", element.get(), Long.valueOf(watermark - watermarkEstimator.getWatermark()));
                    }
                    if (!pubSubConsumer.consume(element.get(), watermarkEstimator, ackReplyConsumer)) {
                        log.info("Terminating consumption by request.");
                        stopAsync(atomicReference);
                    }
                } else {
                    log.warn("Skipping unparseable element {}", pubsubMessage);
                    ackReplyConsumer.ack();
                }
            } catch (Throwable th) {
                log.error("Failed to consume element {}", pubsubMessage, th);
                if (!Boolean.TRUE.equals(unaryFunction.apply(th))) {
                    log.info("Terminating consumption after error.");
                    stopAsync(atomicReference);
                    return;
                }
                log.info("Restarting consumption by request.");
                stopAsync(atomicReference).awaitTerminated();
                runnable.run();
                atomicReference.set(newSubscriber(projectSubscriptionName, (MessageReceiver) atomicReference2.get()));
                ((Subscriber) atomicReference.get()).startAsync().awaitRunning();
            }
        };
    }

    Subscriber stopAsync(AtomicReference<Subscriber> atomicReference) {
        return (Subscriber) Optional.ofNullable(atomicReference.getAndSet(null)).map(subscriber -> {
            log.info("Closing subscriber {}", subscriber);
            subscriber.stopAsync();
            return subscriber;
        }).orElse(null);
    }

    ExecutorService executor() {
        if (this.executor == null) {
            this.executor = this.context.getExecutorService();
        }
        return this.executor;
    }

    static Optional<StreamElement> toElement(EntityDescriptor entityDescriptor, PubsubMessage pubsubMessage) {
        String messageId;
        PubSub.KeyValue parseFrom;
        long stamp;
        Optional findAttribute;
        try {
            messageId = pubsubMessage.getMessageId();
            parseFrom = PubSub.KeyValue.parseFrom(pubsubMessage.getData());
            stamp = parseFrom.getStamp();
            findAttribute = entityDescriptor.findAttribute(parseFrom.getAttribute(), true);
        } catch (InvalidProtocolBufferException e) {
            log.warn("Failed to parse message {}", pubsubMessage, e);
        }
        if (findAttribute.isPresent()) {
            return parseFrom.getDelete() ? Optional.of(StreamElement.delete(entityDescriptor, (AttributeDescriptor) findAttribute.get(), messageId, parseFrom.getKey(), parseFrom.getAttribute(), stamp)) : parseFrom.getDeleteWildcard() ? Optional.of(StreamElement.deleteWildcard(entityDescriptor, (AttributeDescriptor) findAttribute.get(), messageId, parseFrom.getKey(), parseFrom.getAttribute(), stamp)) : Optional.of(StreamElement.upsert(entityDescriptor, (AttributeDescriptor) findAttribute.get(), messageId, parseFrom.getKey(), parseFrom.getAttribute(), stamp, parseFrom.getValue().toByteArray()));
        }
        log.warn("Failed to find attribute {} in entity {}", parseFrom.getAttribute(), entityDescriptor);
        return Optional.empty();
    }

    public boolean hasExternalizableOffsets() {
        return false;
    }

    private String findConsumerFromPartitions(String str, Collection<Partition> collection) {
        if (str != null) {
            return str;
        }
        Set set = (Set) collection.stream().map(partition -> {
            return ((PubSubPartition) partition).getConsumerName();
        }).collect(Collectors.toSet());
        Preconditions.checkArgument(set.size() == 1, "Please provide partitions originating from single #split partition. Got %s", collection);
        return (String) Iterables.getOnlyElement(set);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1516004535:
                if (implMethodName.equals("lambda$observeBulk$e4cf05f8$1")) {
                    z = true;
                    break;
                }
                break;
            case -1349867671:
                if (implMethodName.equals("onError")) {
                    z = false;
                    break;
                }
                break;
            case -828612477:
                if (implMethodName.equals("lambda$observe$95c3c70a$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1260838585:
                if (implMethodName.equals("lambda$observe$2d64c109$1")) {
                    z = 2;
                    break;
                }
                break;
            case 2044814190:
                if (implMethodName.equals("lambda$createBulkCommitter$93d0b822$1")) {
                    z = 4;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/UnaryFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/commitlog/LogObserver") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Throwable;)Z")) {
                    LogObserver logObserver = (LogObserver) serializedLambda.getCapturedArg(0);
                    return logObserver::onError;
                }
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/functional/UnaryFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/commitlog/LogObserver") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Throwable;)Z")) {
                    LogObserver logObserver2 = (LogObserver) serializedLambda.getCapturedArg(0);
                    return logObserver2::onError;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/pubsub/PubSubReader$PubSubConsumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("consume") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lcz/o2/proxima/storage/StreamElement;Lcz/o2/proxima/time/WatermarkSupplier;Lcom/google/cloud/pubsub/v1/AckReplyConsumer;)Z") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/pubsub/PubSubReader") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;Ljava/util/concurrent/atomic/AtomicReference;Ljava/util/concurrent/atomic/AtomicLong;Ljava/util/concurrent/atomic/AtomicLong;Ljava/lang/Object;Ljava/lang/String;Lcz/o2/proxima/direct/commitlog/LogObserver;Lcz/o2/proxima/storage/StreamElement;Lcz/o2/proxima/time/WatermarkSupplier;Lcom/google/cloud/pubsub/v1/AckReplyConsumer;)Z")) {
                    PubSubReader pubSubReader = (PubSubReader) serializedLambda.getCapturedArg(0);
                    Object capturedArg = serializedLambda.getCapturedArg(1);
                    AtomicReference atomicReference = (AtomicReference) serializedLambda.getCapturedArg(2);
                    AtomicLong atomicLong = (AtomicLong) serializedLambda.getCapturedArg(3);
                    AtomicLong atomicLong2 = (AtomicLong) serializedLambda.getCapturedArg(4);
                    Object capturedArg2 = serializedLambda.getCapturedArg(5);
                    String str = (String) serializedLambda.getCapturedArg(6);
                    LogObserver logObserver3 = (LogObserver) serializedLambda.getCapturedArg(7);
                    return (streamElement, watermarkSupplier, ackReplyConsumer) -> {
                        long size;
                        synchronized (capturedArg) {
                            ((List) atomicReference.get()).add(ackReplyConsumer);
                            size = r0.size() + atomicLong.get();
                        }
                        LogObserver.OffsetCommitter createBulkCommitter = createBulkCommitter(capturedArg, size, atomicLong, atomicReference, watermarkSupplier, atomicLong2);
                        synchronized (capturedArg2) {
                            try {
                                if (logObserver3.onNext(streamElement, ObserverUtils.asOnNextContext(createBulkCommitter, new PubSubOffset(str, watermarkSupplier.getWatermark())))) {
                                    return true;
                                }
                                logObserver3.onCompleted();
                                return false;
                            } catch (Exception e) {
                                log.error("Error calling on next", (Throwable) e);
                                createBulkCommitter.fail(e);
                                throw new RuntimeException(e);
                            }
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/commitlog/LogObserver$OffsetCommitter") && serializedLambda.getFunctionalInterfaceMethodName().equals("commit") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(ZLjava/lang/Throwable;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/pubsub/PubSubReader") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/storage/StreamElement;Ljava/util/concurrent/atomic/AtomicLong;Lcz/o2/proxima/time/WatermarkSupplier;Lcom/google/cloud/pubsub/v1/AckReplyConsumer;ZLjava/lang/Throwable;)V")) {
                    StreamElement streamElement2 = (StreamElement) serializedLambda.getCapturedArg(0);
                    AtomicLong atomicLong3 = (AtomicLong) serializedLambda.getCapturedArg(1);
                    WatermarkSupplier watermarkSupplier2 = (WatermarkSupplier) serializedLambda.getCapturedArg(2);
                    AckReplyConsumer ackReplyConsumer2 = (AckReplyConsumer) serializedLambda.getCapturedArg(3);
                    return (z2, th) -> {
                        if (z2) {
                            log.debug("Confirming message {} to PubSub", streamElement2);
                            atomicLong3.set(watermarkSupplier2.getWatermark());
                            ackReplyConsumer2.ack();
                        } else {
                            if (th != null) {
                                log.warn("Error during processing of {}", streamElement2, th);
                            } else {
                                log.info("Nacking message {} by request", streamElement2);
                            }
                            ackReplyConsumer2.nack();
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/pubsub/PubSubReader$PubSubConsumer") && serializedLambda.getFunctionalInterfaceMethodName().equals("consume") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lcz/o2/proxima/storage/StreamElement;Lcz/o2/proxima/time/WatermarkSupplier;Lcom/google/cloud/pubsub/v1/AckReplyConsumer;)Z") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/pubsub/PubSubReader") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicLong;Ljava/lang/String;Lcz/o2/proxima/direct/commitlog/LogObserver;Lcz/o2/proxima/storage/StreamElement;Lcz/o2/proxima/time/WatermarkSupplier;Lcom/google/cloud/pubsub/v1/AckReplyConsumer;)Z")) {
                    AtomicLong atomicLong4 = (AtomicLong) serializedLambda.getCapturedArg(0);
                    String str2 = (String) serializedLambda.getCapturedArg(1);
                    LogObserver logObserver4 = (LogObserver) serializedLambda.getCapturedArg(2);
                    return (streamElement3, watermarkSupplier3, ackReplyConsumer3) -> {
                        LogObserver.OffsetCommitter offsetCommitter = (z22, th2) -> {
                            if (z22) {
                                log.debug("Confirming message {} to PubSub", streamElement3);
                                atomicLong4.set(watermarkSupplier3.getWatermark());
                                ackReplyConsumer3.ack();
                            } else {
                                if (th2 != null) {
                                    log.warn("Error during processing of {}", streamElement3, th2);
                                } else {
                                    log.info("Nacking message {} by request", streamElement3);
                                }
                                ackReplyConsumer3.nack();
                            }
                        };
                        try {
                            boolean onNext = logObserver4.onNext(streamElement3, ObserverUtils.asOnNextContext(offsetCommitter, new PubSubOffset(str2, watermarkSupplier3.getWatermark())));
                            if (!onNext) {
                                logObserver4.onCompleted();
                            }
                            return onNext;
                        } catch (Exception e) {
                            log.error("Error calling onNext", (Throwable) e);
                            offsetCommitter.fail(e);
                            throw new RuntimeException(e);
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/commitlog/LogObserver$OffsetCommitter") && serializedLambda.getFunctionalInterfaceMethodName().equals("commit") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(ZLjava/lang/Throwable;)V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/pubsub/PubSubReader") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;JLjava/util/concurrent/atomic/AtomicLong;Ljava/util/concurrent/atomic/AtomicLong;Lcz/o2/proxima/time/WatermarkSupplier;Ljava/util/concurrent/atomic/AtomicReference;ZLjava/lang/Throwable;)V")) {
                    Object capturedArg3 = serializedLambda.getCapturedArg(0);
                    long longValue = ((Long) serializedLambda.getCapturedArg(1)).longValue();
                    AtomicLong atomicLong5 = (AtomicLong) serializedLambda.getCapturedArg(2);
                    AtomicLong atomicLong6 = (AtomicLong) serializedLambda.getCapturedArg(3);
                    WatermarkSupplier watermarkSupplier4 = (WatermarkSupplier) serializedLambda.getCapturedArg(4);
                    AtomicReference atomicReference2 = (AtomicReference) serializedLambda.getCapturedArg(5);
                    return (z3, th2) -> {
                        Consumer consumer;
                        synchronized (capturedArg3) {
                            int i = (int) (longValue - atomicLong5.get());
                            if (i > 0) {
                                if (z3) {
                                    log.debug("Bulk confirming {} messages", Integer.valueOf(i));
                                    consumer = (v0) -> {
                                        v0.ack();
                                    };
                                    atomicLong6.set(watermarkSupplier4.getWatermark());
                                } else {
                                    if (th2 != null) {
                                        log.warn("Error during processing of last bulk", th2);
                                    } else {
                                        log.info("Nacking last bulk by request");
                                    }
                                    consumer = (v0) -> {
                                        v0.nack();
                                    };
                                }
                                List list = (List) atomicReference2.get();
                                for (int i2 = 0; i2 < i; i2++) {
                                    consumer.accept((AckReplyConsumer) list.get(i2));
                                }
                                atomicLong5.addAndGet(i);
                                atomicReference2.set(Lists.newArrayList(list.subList(i, list.size())));
                            }
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
