package ch.streamly.chronicle.flux;

import ch.streamly.chronicle.flux.replay.ReplayFlux;
import java.io.File;
import java.util.function.Function;
import net.openhft.chronicle.bytes.BytesIn;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.WireStore;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:ch/streamly/chronicle/flux/ChronicleStore.class */
public class ChronicleStore<T> implements FluxStore<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ChronicleStore.class);
    private final Function<T, byte[]> serializer;
    private final Function<byte[], T> deserializer;
    private final SingleChronicleQueue queue;
    private final RollCycle rollCycle;

    /* loaded from: input_file:ch/streamly/chronicle/flux/ChronicleStore$ChronicleStoreBuilder.class */
    public static final class ChronicleStoreBuilder<T> {
        private String path;
        private Function<T, byte[]> serializer;
        private Function<byte[], T> deserializer;
        private RollCycle rollCycle;

        private ChronicleStoreBuilder() {
            this.rollCycle = RollCycles.DAILY;
        }

        public ChronicleStoreBuilder<T> path(String str) {
            this.path = str;
            return this;
        }

        public ChronicleStoreBuilder<T> serializer(Function<T, byte[]> function) {
            this.serializer = function;
            return this;
        }

        public ChronicleStoreBuilder<T> deserializer(Function<byte[], T> function) {
            this.deserializer = function;
            return this;
        }

        public ChronicleStoreBuilder<T> rollCycle(RollCycle rollCycle) {
            this.rollCycle = rollCycle;
            return this;
        }

        public ChronicleStore<T> build() {
            return new ChronicleStore<>(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ch/streamly/chronicle/flux/ChronicleStore$ReaderType.class */
    public enum ReaderType {
        ALL,
        ONLY_HISTORY
    }

    public ChronicleStore(String str, Function<T, byte[]> function, Function<byte[], T> function2) {
        this(newBuilder().path(str).serializer(function).deserializer(function2));
    }

    private ChronicleStore(ChronicleStoreBuilder<T> chronicleStoreBuilder) {
        String str = ((ChronicleStoreBuilder) chronicleStoreBuilder).path;
        this.serializer = ((ChronicleStoreBuilder) chronicleStoreBuilder).serializer;
        this.deserializer = ((ChronicleStoreBuilder) chronicleStoreBuilder).deserializer;
        this.rollCycle = ((ChronicleStoreBuilder) chronicleStoreBuilder).rollCycle;
        this.queue = SingleChronicleQueueBuilder.binary(str).rollCycle(this.rollCycle).build();
    }

    void close() {
        this.queue.close();
    }

    public static <BT> ChronicleStoreBuilder<BT> newBuilder() {
        return new ChronicleStoreBuilder<>();
    }

    @Override // ch.streamly.chronicle.flux.FluxStore
    public Disposable store(Publisher<T> publisher) {
        ExcerptAppender acquireAppender = this.queue.acquireAppender();
        return Flux.from(publisher).doOnError(th -> {
            LOGGER.error("Error received", th);
        }).subscribe(obj -> {
            byte[] apply = this.serializer.apply(obj);
            acquireAppender.writeBytes(bytesOut -> {
                bytesOut.writeInt(apply.length).write(apply);
            });
        });
    }

    @Override // ch.streamly.chronicle.flux.FluxStore
    public void store(T t) {
        ExcerptAppender acquireAppender = this.queue.acquireAppender();
        byte[] apply = this.serializer.apply(t);
        acquireAppender.writeBytes(bytesOut -> {
            bytesOut.writeInt(apply.length).write(apply);
        });
    }

    /* JADX WARN: Removed duplicated region for block: B:20:0x0078  */
    /* JADX WARN: Removed duplicated region for block: B:26:0x0087 A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void readTailer(net.openhft.chronicle.queue.ExcerptTailer r6, reactor.core.publisher.FluxSink<T> r7, ch.streamly.chronicle.flux.ChronicleStore.ReaderType r8, boolean r9) {
        /*
            r5 = this;
            r0 = 0
            r10 = r0
        L3:
            r0 = r7
            boolean r0 = r0.isCancelled()     // Catch: java.lang.Exception -> L8d
            if (r0 != 0) goto L8a
            r0 = r7
            long r0 = r0.requestedFromDownstream()     // Catch: java.lang.Exception -> L8d
            r1 = 0
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 <= 0) goto L4f
            r0 = r6
            r1 = r5
            r2 = r7
            void r1 = (v2) -> { // net.openhft.chronicle.bytes.ReadBytesMarshallable.readMarshallable(net.openhft.chronicle.bytes.BytesIn):void
                r1.lambda$readTailer$4(r2, v2);
            }     // Catch: java.lang.Exception -> L8d
            boolean r0 = r0.readBytes(r1)     // Catch: java.lang.Exception -> L8d
            r11 = r0
            r0 = r11
            if (r0 != 0) goto L4c
            r0 = r8
            ch.streamly.chronicle.flux.ChronicleStore$ReaderType r1 = ch.streamly.chronicle.flux.ChronicleStore.ReaderType.ONLY_HISTORY     // Catch: java.lang.Exception -> L8d
            if (r0 != r1) goto L3b
            r0 = r7
            r0.complete()     // Catch: java.lang.Exception -> L8d
            goto L4c
        L3b:
            r0 = 10
            java.lang.Thread.sleep(r0)     // Catch: java.lang.InterruptedException -> L44 java.lang.Exception -> L8d
            goto L4c
        L44:
            r12 = move-exception
            java.lang.Thread r0 = java.lang.Thread.currentThread()     // Catch: java.lang.Exception -> L8d
            r0.interrupt()     // Catch: java.lang.Exception -> L8d
        L4c:
            goto L60
        L4f:
            r0 = 100
            java.lang.Thread.sleep(r0)     // Catch: java.lang.InterruptedException -> L58 java.lang.Exception -> L8d
            goto L60
        L58:
            r11 = move-exception
            java.lang.Thread r0 = java.lang.Thread.currentThread()     // Catch: java.lang.Exception -> L8d
            r0.interrupt()     // Catch: java.lang.Exception -> L8d
        L60:
            r0 = r5
            net.openhft.chronicle.queue.RollCycle r0 = r0.rollCycle     // Catch: java.lang.Exception -> L8d
            r1 = r6
            long r1 = r1.index()     // Catch: java.lang.Exception -> L8d
            int r0 = r0.toCycle(r1)     // Catch: java.lang.Exception -> L8d
            r11 = r0
            r0 = r11
            r1 = r10
            if (r0 == r1) goto L87
            r0 = r9
            if (r0 == 0) goto L83
            r0 = r5
            r1 = r10
            r0.deleteFile(r1)     // Catch: java.lang.Exception -> L8d
        L83:
            r0 = r11
            r10 = r0
        L87:
            goto L3
        L8a:
            goto La9
        L8d:
            r11 = move-exception
            org.slf4j.Logger r0 = ch.streamly.chronicle.flux.ChronicleStore.LOGGER
            java.lang.String r1 = "Error while tailing on queue {}"
            r2 = r6
            net.openhft.chronicle.queue.ChronicleQueue r2 = r2.queue()
            java.io.File r2 = r2.file()
            java.lang.String r2 = r2.getAbsolutePath()
            r3 = r11
            r0.error(r1, r2, r3)
        La9:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: ch.streamly.chronicle.flux.ChronicleStore.readTailer(net.openhft.chronicle.queue.ExcerptTailer, reactor.core.publisher.FluxSink, ch.streamly.chronicle.flux.ChronicleStore$ReaderType, boolean):void");
    }

    private void deleteFile(int i) {
        WireStore storeForCycle = this.queue.storeForCycle(i, 0L, false);
        if (storeForCycle == null) {
            LOGGER.trace("wirestore is null for cycle {}", Integer.valueOf(i));
            return;
        }
        File file = storeForCycle.file();
        if (file != null) {
            deleteWireStore(file);
        } else {
            LOGGER.error("Could not find file for cycle {}", Integer.valueOf(i));
        }
    }

    private void deleteWireStore(File file) {
        try {
            logDeletionResult(file, file.delete());
        } catch (Exception e) {
            LOGGER.error("Could not delete file {}", file.getAbsolutePath(), e);
        }
    }

    private void logDeletionResult(File file, boolean z) {
        if (z) {
            LOGGER.trace("file {} deleted after read", file.getAbsolutePath());
        } else {
            LOGGER.error("Could not delete file {}", file.getAbsolutePath());
        }
    }

    private void readAndSendValue(FluxSink<T> fluxSink, BytesIn bytesIn) {
        byte[] bArr = new byte[bytesIn.readInt()];
        bytesIn.read(bArr);
        fluxSink.next(this.deserializer.apply(bArr));
    }

    @Override // ch.streamly.chronicle.flux.FluxStore
    public Flux<T> retrieveAll(boolean z) {
        return Flux.create(fluxSink -> {
            launchTailer(fluxSink, ReaderType.ALL, z);
        });
    }

    private void launchTailer(FluxSink<T> fluxSink, ReaderType readerType, boolean z) {
        launchTailer(fluxSink, this.queue.createTailer(), readerType, z);
    }

    private void launchTailer(FluxSink<T> fluxSink, ExcerptTailer excerptTailer, ReaderType readerType, boolean z) {
        Thread thread = new Thread(() -> {
            readTailer(excerptTailer, fluxSink, readerType, z);
        }, "ChronicleStoreRetrieve_" + excerptTailer.queue().file().getAbsolutePath());
        thread.setDaemon(true);
        thread.start();
    }

    @Override // ch.streamly.chronicle.flux.FluxStore
    public Flux<T> retrieveHistory() {
        return Flux.create(fluxSink -> {
            launchTailer(fluxSink, ReaderType.ONLY_HISTORY, false);
        });
    }

    @Override // ch.streamly.chronicle.flux.FluxStore
    public Flux<T> retrieveNewValues() {
        ExcerptTailer end = this.queue.createTailer().toEnd();
        return Flux.create(fluxSink -> {
            launchTailer(fluxSink, end, ReaderType.ALL, false);
        });
    }

    @Override // ch.streamly.chronicle.flux.FluxStore
    public ReplayFlux<T> replayHistory(Function<T, Long> function) {
        return new ReplayFlux<>(Flux.defer(this::retrieveHistory), function);
    }
}
