package no.ssb.sagalog.pulsar;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Deque;
import java.util.Iterator;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import no.ssb.sagalog.SagaLog;
import no.ssb.sagalog.SagaLogEntry;
import no.ssb.sagalog.SagaLogEntryBuilder;
import no.ssb.sagalog.SagaLogEntryId;
import no.ssb.sagalog.SagaLogEntryType;
import no.ssb.sagalog.SagaLogId;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:no/ssb/sagalog/pulsar/PulsarSagaLog.class */
public class PulsarSagaLog implements SagaLog, AutoCloseable {
    private final PulsarSagaLogId sagaLogId;
    private final Consumer<byte[]> consumer;
    private final Producer<byte[]> producer;
    private final Deque<SagaLogEntry> cache = new ConcurrentLinkedDeque();
    private final AtomicBoolean closed = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    public PulsarSagaLog(PulsarClient pulsarClient, SagaLogId sagaLogId) throws PulsarClientException {
        this.sagaLogId = (PulsarSagaLogId) sagaLogId;
        this.consumer = pulsarClient.newConsumer().topic(new String[]{this.sagaLogId.getTopic()}).subscriptionType(SubscriptionType.Exclusive).consumerName(this.sagaLogId.getNamespace() + "::" + this.sagaLogId.getClusterInstanceId()).subscriptionName("master").subscribe();
        this.producer = pulsarClient.newProducer().topic(this.sagaLogId.getTopic()).producerName(this.sagaLogId.getNamespace() + "::" + this.sagaLogId.getClusterInstanceId()).create();
        readExternal().forEachOrdered(sagaLogEntry -> {
            this.cache.add(sagaLogEntry);
        });
    }

    public SagaLogId id() {
        return this.sagaLogId;
    }

    private Stream<SagaLogEntry> readExternal() {
        final CompletableFuture sendAsync = this.producer.sendAsync(serialize(builder().control()));
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<SagaLogEntry>() { // from class: no.ssb.sagalog.pulsar.PulsarSagaLog.1
            MessageId previousmessageId = null;

            @Override // java.util.Iterator
            public boolean hasNext() {
                return !((MessageId) sendAsync.join()).equals(this.previousmessageId);
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public SagaLogEntry next() {
                Message receive;
                do {
                    try {
                        receive = PulsarSagaLog.this.consumer.receive(3, TimeUnit.SECONDS);
                    } catch (PulsarClientException e) {
                        throw new RuntimeException((Throwable) e);
                    }
                } while (receive == null);
                this.previousmessageId = receive.getMessageId();
                return PulsarSagaLog.this.deserialize(receive.getData()).id(new PulsarSagaLogEntryId(receive.getMessageId())).build();
            }
        }, 1296), false);
    }

    public CompletableFuture<SagaLogEntry> write(SagaLogEntryBuilder sagaLogEntryBuilder) {
        checkNotClosed();
        return this.producer.sendAsync(serialize(sagaLogEntryBuilder)).thenApply(messageId -> {
            SagaLogEntry build = sagaLogEntryBuilder.id(new PulsarSagaLogEntryId(messageId)).build();
            this.cache.add(build);
            return build;
        });
    }

    public CompletableFuture<Void> truncate(SagaLogEntryId sagaLogEntryId) {
        checkNotClosed();
        this.cache.stream().filter(sagaLogEntry -> {
            return sagaLogEntryId.equals(sagaLogEntry.getId());
        }).findFirst().orElseThrow();
        return this.consumer.acknowledgeCumulativeAsync(((PulsarSagaLogEntryId) sagaLogEntryId).id).thenAccept(r5 -> {
            Iterator<SagaLogEntry> it = this.cache.iterator();
            while (it.hasNext()) {
                SagaLogEntry next = it.next();
                it.remove();
                if (sagaLogEntryId.equals(next.getId())) {
                    return;
                }
            }
        });
    }

    public CompletableFuture<Void> truncate() {
        checkNotClosed();
        return this.cache.isEmpty() ? CompletableFuture.completedFuture(null) : truncate(this.cache.getLast().getId());
    }

    public Stream<SagaLogEntry> readIncompleteSagas() {
        checkNotClosed();
        return this.cache.stream().filter(sagaLogEntry -> {
            return SagaLogEntryType.Ignore != sagaLogEntry.getEntryType();
        });
    }

    public Stream<SagaLogEntry> readEntries(String str) {
        checkNotClosed();
        return this.cache.stream().filter(sagaLogEntry -> {
            return SagaLogEntryType.Ignore != sagaLogEntry.getEntryType();
        }).filter(sagaLogEntry2 -> {
            return str.equals(sagaLogEntry2.getExecutionId());
        });
    }

    private void checkNotClosed() {
        if (this.closed.get()) {
            throw new RuntimeException(String.format("Saga-log is already closed, saga-log-id: %s", this.sagaLogId));
        }
    }

    public String toString(SagaLogEntryId sagaLogEntryId) {
        return ((PulsarSagaLogEntryId) sagaLogEntryId).id.toString();
    }

    /* renamed from: fromString, reason: merged with bridge method [inline-methods] */
    public PulsarSagaLogEntryId m0fromString(String str) {
        String[] split = str.split(":");
        return new PulsarSagaLogEntryId(DefaultImplementation.newMessageId(Long.parseLong(split[0]), Long.parseLong(split[1]), Integer.parseInt(split[2])));
    }

    public byte[] toBytes(SagaLogEntryId sagaLogEntryId) {
        return ((PulsarSagaLogEntryId) sagaLogEntryId).id.toByteArray();
    }

    public SagaLogEntryId fromBytes(byte[] bArr) {
        try {
            return new PulsarSagaLogEntryId(MessageIdImpl.fromByteArray(bArr));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    byte[] serialize(SagaLogEntryBuilder sagaLogEntryBuilder) {
        return (sagaLogEntryBuilder.executionId() + " " + sagaLogEntryBuilder.entryType() + " " + sagaLogEntryBuilder.nodeId() + (sagaLogEntryBuilder.sagaName() == null ? "" : " " + sagaLogEntryBuilder.sagaName()) + (sagaLogEntryBuilder.jsonData() == null ? "" : " " + sagaLogEntryBuilder.jsonData())).getBytes(StandardCharsets.UTF_8);
    }

    SagaLogEntryBuilder deserialize(byte[] bArr) {
        String str = new String(bArr, StandardCharsets.UTF_8);
        SagaLogEntryBuilder builder = builder();
        int indexOf = str.indexOf(32);
        String substring = str.substring(0, indexOf);
        String substring2 = str.substring(indexOf + 1);
        builder.executionId(substring);
        int indexOf2 = substring2.indexOf(32);
        SagaLogEntryType valueOf = SagaLogEntryType.valueOf(substring2.substring(0, indexOf2));
        String substring3 = substring2.substring(indexOf2 + 1);
        builder.entryType(valueOf);
        int indexOf3 = substring3.indexOf(32);
        if (indexOf3 == -1) {
            return builder.nodeId(substring3);
        }
        String substring4 = substring3.substring(0, indexOf3);
        String substring5 = substring3.substring(indexOf3 + 1);
        builder.nodeId(substring4);
        if (!"S".equals(substring4)) {
            int indexOf4 = substring5.indexOf(123);
            return indexOf4 == -1 ? builder : builder.jsonData(substring5.substring(indexOf4));
        }
        int indexOf5 = substring5.indexOf(123);
        if (indexOf5 == -1) {
            return builder.sagaName(substring5.substring(0, substring5.length() - 1));
        }
        return builder.sagaName(substring5.substring(0, indexOf5 - 1)).jsonData(substring5.substring(indexOf5));
    }

    @Override // java.lang.AutoCloseable
    public void close() throws PulsarClientException {
        if (this.closed.compareAndSet(false, true)) {
            try {
                this.producer.close();
            } finally {
                this.consumer.close();
            }
        }
    }
}
