package krati.retention;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import krati.core.StoreConfig;
import krati.core.segment.SegmentFactory;
import krati.core.segment.WriteBufferSegmentFactory;
import krati.retention.clock.Clock;
import krati.retention.clock.Occurred;
import krati.retention.policy.RetentionPolicy;
import krati.store.BytesDB;
import krati.util.DaemonThreadFactory;
import org.apache.log4j.Logger;

/* loaded from: input_file:BOOT-INF/lib/krati-0.4.9.jar:krati/retention/SimpleRetention.class */
public class SimpleRetention<T> implements Retention<T> {
    private static final Logger _logger = Logger.getLogger(SimpleRetention.class);
    private final int _id;
    private final File _homeDir;
    private final BytesDB _store;
    private final int _eventBatchSize;
    private final EventBatchSerializer<T> _eventBatchSerializer;
    private final ConcurrentLinkedQueue<EventBatchCursor> _retentionQueue;
    private final RetentionPolicy _retentionPolicy;
    private final SimpleRetention<T>.RetentionPolicyApply _retentionPolicyApply;
    private final ScheduledExecutorService _retentionPolicyExecutor;
    private volatile EventBatch<T> _batch;
    private volatile EventBatch<T> _lastBatch;
    private volatile EventBatchCursor _lastBatchCursor;
    private final Lock _batchLock;
    private RetentionFlushListener _flushListener;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/krati-0.4.9.jar:krati/retention/SimpleRetention$RetentionPolicyApply.class */
    public class RetentionPolicyApply implements Runnable {
        private RetentionPolicyApply() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Collection<EventBatchCursor> apply = SimpleRetention.this._retentionPolicy.apply(SimpleRetention.this._retentionQueue);
            if (apply == null || apply.size() <= 0) {
                return;
            }
            for (EventBatchCursor eventBatchCursor : apply) {
                int lookup = eventBatchCursor.getLookup();
                try {
                    if (SimpleRetention.this._retentionPolicy.isCallback()) {
                        try {
                            SimpleRetention.this._retentionPolicy.applyCallbackOn(SimpleRetention.this._eventBatchSerializer.deserialize(SimpleRetention.this._store.get(lookup)));
                        } catch (Exception e) {
                            if (SimpleRetention.this._store.isOpen()) {
                                SimpleRetention._logger.error("Failed to apply callback on cursor: " + eventBatchCursor.getHeader().getOrigin(), e);
                            }
                        }
                    }
                    SimpleRetention.this._store.set(lookup, null, SimpleRetention.this.getOffset());
                    SimpleRetention._logger.info("Removed EventBatch: " + eventBatchCursor.getHeader().getOrigin());
                } catch (Exception e2) {
                    if (SimpleRetention.this._store.isOpen()) {
                        SimpleRetention._logger.error("Failed to apply retention policy on cursor " + lookup, e2.getCause());
                    }
                }
            }
        }
    }

    public SimpleRetention(RetentionConfig<T> retentionConfig) throws Exception {
        this(retentionConfig.getId(), new File(retentionConfig.getHomeDir(), "retention"), retentionConfig.getRetentionInitialSize(), retentionConfig.getRetentionPolicy(), new SimpleEventBatchSerializer(retentionConfig.getEventValueSerializer(), retentionConfig.getEventClockSerializer()), retentionConfig.getBatchSize(), retentionConfig.getNumSyncBatchs(), retentionConfig.getRetentionSegmentFactory(), retentionConfig.getRetentionSegmentFileSizeMB());
    }

    public SimpleRetention(int i, File file, RetentionPolicy retentionPolicy, EventBatchSerializer<T> eventBatchSerializer, int i2) throws Exception {
        this(i, file, 100000, retentionPolicy, eventBatchSerializer, i2, new WriteBufferSegmentFactory(), 32);
    }

    public SimpleRetention(int i, File file, int i2, RetentionPolicy retentionPolicy, EventBatchSerializer<T> eventBatchSerializer, int i3, SegmentFactory segmentFactory, int i4) throws Exception {
        this(i, file, i2, retentionPolicy, eventBatchSerializer, i3, 10, segmentFactory, i4);
    }

    protected SimpleRetention(int i, File file, int i2, RetentionPolicy retentionPolicy, EventBatchSerializer<T> eventBatchSerializer, int i3, int i4, SegmentFactory segmentFactory, int i5) throws Exception {
        this._retentionQueue = new ConcurrentLinkedQueue<>();
        this._retentionPolicyApply = new RetentionPolicyApply();
        this._retentionPolicyExecutor = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory());
        this._batch = null;
        this._lastBatch = null;
        this._lastBatchCursor = null;
        this._batchLock = new ReentrantLock();
        this._flushListener = null;
        this._id = i;
        this._homeDir = file;
        this._retentionPolicy = retentionPolicy;
        this._eventBatchSerializer = eventBatchSerializer;
        this._eventBatchSize = Math.max(100, i3);
        StoreConfig storeConfig = new StoreConfig(file, i2);
        storeConfig.setBatchSize(1);
        storeConfig.setNumSyncBatches(i4);
        storeConfig.setSegmentFileSizeMB(i5);
        storeConfig.setSegmentFactory(segmentFactory);
        this._store = new BytesDB(storeConfig);
        init();
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected void init() throws IOException {
        this._store.sync();
        int capacity = this._store.capacity();
        ArrayList arrayList = new ArrayList(capacity / 2);
        for (int i = 0; i < capacity; i++) {
            if (this._store.hasData(i)) {
                try {
                    arrayList.add(new SimpleEventBatchCursor(i, this._eventBatchSerializer.deserializeHeader(this._store.get(i))));
                } catch (Exception e) {
                    _logger.error("Failed to open a cursor", e);
                }
            }
        }
        Clock clock = Clock.ZERO;
        long j = 0;
        int size = arrayList.size();
        if (size > 0) {
            Collections.sort(arrayList, new Comparator<EventBatchCursor>() { // from class: krati.retention.SimpleRetention.1
                @Override // java.util.Comparator
                public int compare(EventBatchCursor eventBatchCursor, EventBatchCursor eventBatchCursor2) {
                    return (int) (eventBatchCursor.getHeader().getOrigin() - eventBatchCursor2.getHeader().getOrigin());
                }
            });
            for (int i2 = 0; i2 < size; i2++) {
                this._retentionQueue.add(arrayList.get(i2));
            }
            EventBatchHeader header = ((EventBatchCursor) arrayList.get(size - 1)).getHeader();
            j = header.getOrigin() + header.getSize();
            clock = header.getMaxClock();
        }
        this._batch = nextEventBatch(j, clock);
        this._lastBatch = null;
        this._lastBatchCursor = null;
        scheduleRetentionPolicy();
        _logger.info("init " + size + " batches");
        _logger.info("init position=" + getPosition());
        _logger.info("init batch=" + this._batch);
    }

    protected EventBatch<T> nextEventBatch(long j, Clock clock) {
        SimpleEventBatch simpleEventBatch = new SimpleEventBatch(j, clock, this._eventBatchSize);
        _logger.info("Created EventBatch: " + simpleEventBatch.getOrigin());
        return simpleEventBatch;
    }

    protected void scheduleRetentionPolicy() {
        this._retentionPolicyExecutor.scheduleWithFixedDelay(this._retentionPolicyApply, 1L, 5L, TimeUnit.SECONDS);
    }

    public final File getHomeDir() {
        return this._homeDir;
    }

    public final int getEventBatchSize() {
        return this._eventBatchSize;
    }

    public final EventBatchSerializer<T> getEventBatchSerializer() {
        return this._eventBatchSerializer;
    }

    public final RetentionFlushListener getFlushListener() {
        return this._flushListener;
    }

    public final void setFlushListener(RetentionFlushListener retentionFlushListener) {
        this._flushListener = retentionFlushListener;
    }

    @Override // krati.retention.Retention
    public final int getId() {
        return this._id;
    }

    @Override // krati.retention.Retention
    public long getOrigin() {
        long origin = this._batch.getOrigin();
        EventBatchCursor peek = this._retentionQueue.peek();
        return peek == null ? origin : peek.getHeader().getOrigin();
    }

    @Override // krati.retention.Retention
    public long getOffset() {
        return this._batch.getOrigin() + this._batch.getSize();
    }

    @Override // krati.retention.Retention
    public Clock getMinClock() {
        Clock minClock = this._batch.getMinClock();
        EventBatchCursor peek = this._retentionQueue.peek();
        return peek == null ? minClock : peek.getHeader().getMinClock();
    }

    @Override // krati.retention.Retention
    public Clock getMaxClock() {
        return this._batch.getMaxClock();
    }

    @Override // krati.retention.Retention
    public Clock getClock(long j) {
        if (j < getOrigin()) {
            return null;
        }
        if (j >= getOffset()) {
            return getMaxClock();
        }
        this._batchLock.lock();
        try {
            Clock clock = this._batch.getClock(j);
            if (clock != null) {
                return clock;
            }
            EventBatch<T> eventBatch = this._lastBatch;
            if (eventBatch != null) {
                Clock clock2 = eventBatch.getClock(j);
                if (clock2 != null) {
                    this._batchLock.unlock();
                    return clock2;
                }
            }
            this._batchLock.unlock();
            int i = 0;
            Iterator<EventBatchCursor> it = this._retentionQueue.iterator();
            while (it.hasNext()) {
                EventBatchCursor next = it.next();
                long origin = next.getHeader().getOrigin();
                if (origin <= j) {
                    if (j < origin + r0.getSize()) {
                        try {
                            Clock clock3 = ((EventBatch) this._eventBatchSerializer.deserialize(this._store.get(next.getLookup()))).getClock(j);
                            if (clock3 != null) {
                                return clock3;
                            }
                        } catch (Exception e) {
                            _logger.warn(e.getMessage());
                        }
                    } else {
                        continue;
                    }
                } else if (i == 0) {
                    return null;
                }
                i++;
            }
            return null;
        } finally {
            this._batchLock.unlock();
        }
    }

    @Override // krati.retention.Retention
    public final int getBatchSize() {
        return this._eventBatchSize;
    }

    @Override // krati.retention.Retention
    public final RetentionPolicy getRetentionPolicy() {
        return this._retentionPolicy;
    }

    @Override // krati.retention.Retention, krati.retention.RetentionClient
    public final Position getPosition() {
        return new SimplePosition(getId(), getOffset(), getMaxClock());
    }

    @Override // krati.retention.Retention, krati.retention.RetentionClient
    public Position getPosition(Clock clock) {
        Occurred compareTo = clock.compareTo(getMinClock());
        if (compareTo == Occurred.EQUICONCURRENTLY) {
            return new SimplePosition(getId(), getOrigin(), getMinClock());
        }
        if (compareTo == Occurred.BEFORE || compareTo == Occurred.CONCURRENTLY) {
            return null;
        }
        if (clock.after(getMaxClock())) {
            return getPosition();
        }
        this._batchLock.lock();
        try {
            EventBatch<T> eventBatch = this._batch;
            long offset = eventBatch.getOffset(clock);
            if (offset >= 0) {
                SimplePosition simplePosition = new SimplePosition(getId(), offset, eventBatch.getClock(offset));
                this._batchLock.unlock();
                return simplePosition;
            }
            EventBatch<T> eventBatch2 = this._lastBatch;
            if (eventBatch2 != null) {
                if (eventBatch2.getMaxClock().before(clock)) {
                    if (eventBatch.getMinClock().compareTo(clock) == Occurred.EQUICONCURRENTLY) {
                        SimplePosition simplePosition2 = new SimplePosition(getId(), eventBatch.getOrigin(), eventBatch.getMinClock());
                        this._batchLock.unlock();
                        return simplePosition2;
                    }
                    long origin = eventBatch2.getOrigin() + eventBatch2.getSize();
                    SimplePosition simplePosition3 = new SimplePosition(getId(), origin, eventBatch2.getClock(origin));
                    this._batchLock.unlock();
                    return simplePosition3;
                }
                long offset2 = eventBatch2.getOffset(clock);
                if (offset2 >= 0) {
                    SimplePosition simplePosition4 = new SimplePosition(getId(), offset2, eventBatch2.getClock(offset2));
                    this._batchLock.unlock();
                    return simplePosition4;
                }
            }
            this._batchLock.unlock();
            int i = 0;
            Iterator<EventBatchCursor> it = this._retentionQueue.iterator();
            while (it.hasNext()) {
                EventBatchCursor next = it.next();
                EventBatchHeader header = next.getHeader();
                Occurred compareTo2 = header.getMinClock().compareTo(clock);
                if (compareTo2 == Occurred.EQUICONCURRENTLY) {
                    if (i == 0) {
                        return null;
                    }
                    return new SimplePosition(getId(), header.getOrigin(), header.getMinClock());
                }
                if (compareTo2 == Occurred.BEFORE) {
                    if (clock.after(header.getMaxClock())) {
                        continue;
                    } else {
                        try {
                            EventBatch eventBatch3 = (EventBatch) this._eventBatchSerializer.deserialize(this._store.get(next.getLookup()));
                            long offset3 = eventBatch3.getOffset(clock);
                            if (offset3 >= 0) {
                                return new SimplePosition(getId(), offset3, eventBatch3.getClock(offset3));
                            }
                            continue;
                        } catch (Exception e) {
                            _logger.warn(e.getMessage());
                        }
                    }
                } else if (i == 0) {
                    return null;
                }
                i++;
            }
            return null;
        } catch (Throwable th) {
            this._batchLock.unlock();
            throw th;
        }
    }

    @Override // krati.retention.Retention, krati.retention.RetentionClient
    public Position get(Position position, List<Event<T>> list) {
        if (position.getOffset() < getOrigin() || position.isIndexed()) {
            return null;
        }
        EventBatch<T> eventBatch = this._batch;
        if (eventBatch.getOrigin() <= position.getOffset()) {
            long j = eventBatch.get(position.getOffset(), list);
            return new SimplePosition(getId(), j, position.getOffset() < j ? eventBatch.getClock(j - 1) : position.getClock());
        }
        EventBatch<T> eventBatch2 = this._lastBatch;
        if (eventBatch2 != null && eventBatch2.getOrigin() <= position.getOffset()) {
            long j2 = eventBatch2.get(position.getOffset(), list);
            return new SimplePosition(getId(), j2, position.getOffset() < j2 ? eventBatch2.getClock(j2 - 1) : position.getClock());
        }
        int i = 0;
        Iterator<EventBatchCursor> it = this._retentionQueue.iterator();
        while (it.hasNext()) {
            EventBatchCursor next = it.next();
            if (next.getHeader().getOrigin() <= position.getOffset()) {
                try {
                    EventBatch eventBatch3 = (EventBatch) this._eventBatchSerializer.deserialize(this._store.get(next.getLookup()));
                    long j3 = eventBatch3.get(position.getOffset(), list);
                    if (position.getOffset() < j3) {
                        return new SimplePosition(getId(), j3, eventBatch3.getClock(j3 - 1));
                    }
                    continue;
                } catch (Exception e) {
                    _logger.warn("Ignored EventBatch: " + next.getHeader().getOrigin());
                }
            } else if (i == 0) {
                return null;
            }
            i++;
        }
        return null;
    }

    @Override // krati.retention.Retention
    public synchronized boolean put(Event<T> event) throws Exception {
        if (this._batch.isFull()) {
            this._batch.setCompletionTime(System.currentTimeMillis());
            byte[] serialize = this._eventBatchSerializer.serialize(this._batch);
            if (this._flushListener != null) {
                this._flushListener.beforeFlush(this._batch);
            }
            int add = this._store.add(serialize, getOffset());
            if (this._flushListener != null) {
                this._flushListener.afterFlush(this._batch);
            }
            SimpleEventBatchCursor simpleEventBatchCursor = new SimpleEventBatchCursor(add, this._batch.getHeader());
            this._retentionQueue.offer(simpleEventBatchCursor);
            this._batchLock.lock();
            try {
                this._lastBatch = this._batch;
                this._lastBatchCursor = simpleEventBatchCursor;
                this._batch = nextEventBatch(this._batch.getOrigin() + this._batch.getSize(), event.getClock());
                this._batchLock.unlock();
            } catch (Throwable th) {
                this._batchLock.unlock();
                throw th;
            }
        }
        return this._batch.put(event);
    }

    @Override // krati.io.Closeable
    public boolean isOpen() {
        return this._store.isOpen();
    }

    @Override // krati.io.Closeable
    public synchronized void open() throws IOException {
        if (this._store.isOpen()) {
            return;
        }
        this._store.open();
        scheduleRetentionPolicy();
    }

    @Override // krati.io.Closeable, java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws IOException {
        if (this._store.isOpen()) {
            this._retentionPolicyExecutor.shutdown();
            this._store.close();
        }
    }

    @Override // krati.retention.Retention
    public synchronized void flush() throws IOException {
        if (!isOpen() || this._batch.isEmpty() || mergeEventsToLastBatch()) {
            return;
        }
        this._batch.setCompletionTime(System.currentTimeMillis());
        byte[] serialize = this._eventBatchSerializer.serialize(this._batch);
        if (this._flushListener != null) {
            this._flushListener.beforeFlush(this._batch);
        }
        try {
            int add = this._store.add(serialize, getOffset());
            if (this._flushListener != null) {
                this._flushListener.afterFlush(this._batch);
            }
            SimpleEventBatchCursor simpleEventBatchCursor = new SimpleEventBatchCursor(add, this._batch.getHeader());
            this._retentionQueue.offer(simpleEventBatchCursor);
            this._batchLock.lock();
            try {
                this._lastBatch = this._batch;
                this._lastBatchCursor = simpleEventBatchCursor;
                this._batch = nextEventBatch(this._batch.getOrigin() + this._batch.getSize(), this._batch.getMaxClock());
                this._batchLock.unlock();
            } catch (Throwable th) {
                this._batchLock.unlock();
                throw th;
            }
        } catch (Exception e) {
            if (!(e instanceof IOException)) {
                throw new IOException(e);
            }
            throw ((IOException) e);
        }
    }

    protected boolean mergeEventsToLastBatch() throws IOException {
        if (this._lastBatch == null || this._eventBatchSize < this._lastBatch.getSize() + this._batch.getSize()) {
            return false;
        }
        this._batch.setCompletionTime(System.currentTimeMillis());
        if (this._flushListener != null) {
            this._flushListener.beforeFlush(this._batch);
        }
        SimpleEventBatch<T> m164clone = ((SimpleEventBatch) this._lastBatch).m164clone();
        try {
            Iterator<Event<T>> it = this._batch.iterator();
            while (it.hasNext()) {
                m164clone.put(it.next());
            }
            m164clone.setCompletionTime(this._batch.getCompletionTime());
            this._store.set(this._lastBatchCursor.getLookup(), this._eventBatchSerializer.serialize(m164clone), getOffset());
            if (this._flushListener != null) {
                this._flushListener.afterFlush(this._batch);
            }
            this._batchLock.lock();
            try {
                this._lastBatch = m164clone;
                this._lastBatchCursor.setHeader(m164clone.getHeader());
                _logger.info(this._batch.getSize() + " events merged to EventBatch " + this._lastBatchCursor.getLookup());
                this._batch = nextEventBatch(this._batch.getOrigin() + this._batch.getSize(), this._batch.getMaxClock());
                this._batchLock.unlock();
                return true;
            } catch (Throwable th) {
                this._batchLock.unlock();
                throw th;
            }
        } catch (Exception e) {
            _logger.info("events merge aborted", e);
            return false;
        }
    }
}
