package cz.o2.proxima.direct.pubsub;

import cz.o2.proxima.direct.core.AbstractOnlineAttributeWriter;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.pubsub.proto.PubSub;
import cz.o2.proxima.pubsub.shaded.com.google.api.core.ApiFutureCallback;
import cz.o2.proxima.pubsub.shaded.com.google.api.core.ApiFutures;
import cz.o2.proxima.pubsub.shaded.com.google.cloud.pubsub.v1.Publisher;
import cz.o2.proxima.pubsub.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.pubsub.shaded.com.google.protobuf.ByteString;
import cz.o2.proxima.pubsub.shaded.com.google.protobuf.Timestamp;
import cz.o2.proxima.pubsub.shaded.com.google.pubsub.v1.ProjectTopicName;
import cz.o2.proxima.pubsub.shaded.com.google.pubsub.v1.PubsubMessage;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.ExceptionUtils;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cz/o2/proxima/direct/pubsub/PubSubWriter.class */
public class PubSubWriter extends AbstractOnlineAttributeWriter implements OnlineAttributeWriter {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PubSubWriter.class);
    private final PubSubAccessor accessor;
    private final Context context;
    private final AtomicInteger inflight;
    private final Serializable flightLock;
    private volatile boolean closed;
    private transient boolean initialized;
    private transient Publisher publisher;
    private transient ExecutorService executor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PubSubWriter(PubSubAccessor pubSubAccessor, Context context) {
        super(pubSubAccessor.getEntityDescriptor(), pubSubAccessor.getUri());
        this.inflight = new AtomicInteger();
        this.flightLock = new Serializable() { // from class: cz.o2.proxima.direct.pubsub.PubSubWriter.1
        };
        this.closed = false;
        this.initialized = false;
        this.accessor = pubSubAccessor;
        this.context = context;
    }

    synchronized void initialize() {
        if (this.initialized) {
            return;
        }
        try {
            this.publisher = newPublisher(this.accessor.getProject(), this.accessor.getTopic());
            this.executor = this.context.getExecutorService();
            this.initialized = true;
            this.closed = false;
        } catch (IOException e) {
            if (this.publisher != null) {
                ExceptionUtils.unchecked(() -> {
                    this.publisher.shutdown();
                });
            }
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    Publisher newPublisher(String str, String str2) throws IOException {
        return Publisher.newBuilder(ProjectTopicName.of(str, str2)).build();
    }

    public synchronized void write(final StreamElement streamElement, final CommitCallback commitCallback) {
        initialize();
        log.debug("Writing data {} to {}", streamElement, getUri());
        try {
            if (this.inflight.incrementAndGet() >= 1000) {
                while (this.inflight.get() >= 1000) {
                    synchronized (this.flightLock) {
                        this.flightLock.wait(1000L);
                    }
                }
            }
            ApiFutures.addCallback(this.publisher.publish(PubsubMessage.newBuilder().setMessageId(streamElement.getUuid()).setPublishTime(Timestamp.newBuilder().setSeconds(streamElement.getStamp() / 1000).setNanos(((int) (streamElement.getStamp() % 1000)) * 1000000)).setData(PubSub.KeyValue.newBuilder().setKey(streamElement.getKey()).setAttribute(streamElement.getAttribute()).setDelete(streamElement.isDelete()).setDeleteWildcard(streamElement.isDeleteWildcard()).setValue(streamElement.isDelete() ? ByteString.EMPTY : ByteString.copyFrom(streamElement.getValue())).setStamp(streamElement.getStamp()).build().toByteString()).build()), new ApiFutureCallback<String>() { // from class: cz.o2.proxima.direct.pubsub.PubSubWriter.2
                private void handle(boolean z, Throwable th) {
                    commitCallback.commit(z, th);
                    if (PubSubWriter.this.inflight.getAndDecrement() >= 1000 || PubSubWriter.this.closed) {
                        synchronized (PubSubWriter.this.flightLock) {
                            PubSubWriter.this.flightLock.notifyAll();
                        }
                    }
                }

                @Override // cz.o2.proxima.pubsub.shaded.com.google.api.core.ApiFutureCallback
                public void onFailure(Throwable th) {
                    PubSubWriter.log.warn("Failed to publish element {} to pubsub", streamElement, th);
                    handle(false, th);
                }

                @Override // cz.o2.proxima.pubsub.shaded.com.google.api.core.ApiFutureCallback
                public void onSuccess(String str) {
                    PubSubWriter.log.debug("Committing processing of {} with success", streamElement);
                    handle(true, null);
                }
            }, this.executor);
        } catch (Throwable th) {
            log.warn("Failed to publish {} to pubsub", streamElement, th);
            commitCallback.commit(false, th);
        }
    }

    public synchronized void close() {
        if (this.publisher != null) {
            try {
                this.closed = true;
                while (this.inflight.get() != 0) {
                    synchronized (this.flightLock) {
                        this.flightLock.wait(100L);
                    }
                }
                this.executor.shutdown();
                this.executor.awaitTermination(10L, TimeUnit.SECONDS);
                this.publisher.shutdown();
            } catch (Exception e) {
                log.warn("Failed to shutdown publisher {}", this.publisher, e);
            }
            this.publisher = null;
            this.initialized = false;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -836802267:
                if (implMethodName.equals("lambda$initialize$c5b2a3c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/pubsub/PubSubWriter") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    PubSubWriter pubSubWriter = (PubSubWriter) serializedLambda.getCapturedArg(0);
                    return () -> {
                        this.publisher.shutdown();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
