/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.io.kafka;

import cz.o2.proxima.internal.com.google.common.base.MoreObjects;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OffsetCommitter<ID> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(OffsetCommitter.class);
    private final Map<ID, NavigableMap<Long, OffsetMeta>> waitingOffsets = Collections.synchronizedMap(new HashMap());
    private final long stateCommitWarningMillis;
    private final long autoCommitMs;
    private long lastReportedStaleOffset = 0L;

    public OffsetCommitter() {
        this(60000L, Long.MAX_VALUE);
    }

    public OffsetCommitter(long staleCommitWarningMillis, long autoCommitMs) {
        this.stateCommitWarningMillis = staleCommitWarningMillis;
        this.autoCommitMs = autoCommitMs;
    }

    public void register(ID id, long offset, int numActions, Callback commit) {
        NavigableMap current = this.waitingOffsets.computeIfAbsent(id, tmp -> Collections.synchronizedNavigableMap(new TreeMap()));
        log.debug("Registered offset {} for ID {} with {} actions", new Object[]{offset, id, numActions});
        current.put(offset, new OffsetMeta(numActions, commit));
        if (numActions == 0) {
            this.checkCommittable(id, current);
        }
    }

    public void confirm(ID id, long offset) {
        Map current = this.waitingOffsets.get(id);
        log.debug("Confirming processing of offset {} with ID {}", (Object)offset, id);
        if (current != null) {
            OffsetMeta meta = (OffsetMeta)current.get(offset);
            if (meta != null) {
                meta.decrement();
            }
            this.checkCommittable(id, current);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkCommittable(ID id, Map<Long, OffsetMeta> current) {
        Map<Long, OffsetMeta> map = current;
        synchronized (map) {
            ArrayList<Map.Entry<Long, OffsetMeta>> committable = new ArrayList<Map.Entry<Long, OffsetMeta>>();
            for (Map.Entry<Long, OffsetMeta> e2 : current.entrySet()) {
                long now;
                long age = e2.getValue().getMillisAge();
                if (age > this.autoCommitMs) {
                    committable.add(e2);
                    log.warn("Auto adding offset {} of ID {} to comittable map due to age {} ms. The commit might have been lost. Verify your commit logic to remove this warning.", new Object[]{e2.getKey(), id, age});
                    continue;
                }
                if (e2.getValue().getActions() <= 0) {
                    committable.add(e2);
                    log.debug("Added offset {} of ID {} to committable map.", (Object)e2.getKey(), id);
                    continue;
                }
                if (age > this.stateCommitWarningMillis && (now = System.currentTimeMillis()) - this.lastReportedStaleOffset > 5000L) {
                    this.lastReportedStaleOffset = now;
                    log.warn("Offset {} ID {} was not committed in {} ms ({} actions missing). Please verify your commit logic!", new Object[]{e2.getKey(), id, age, e2.getValue().getActions()});
                    break;
                }
                log.debug("Waiting for still non-committed offset {} in {}, {} actions missing", new Object[]{e2.getKey(), id, e2.getValue().getActions()});
                break;
            }
            if (!committable.isEmpty()) {
                Map.Entry toCommit = (Map.Entry)committable.get(committable.size() - 1);
                committable.forEach(e -> current.remove(e.getKey()));
                ((OffsetMeta)toCommit.getValue()).getCommit().apply();
            }
        }
    }

    public void clear(ID id, long offset) {
        Map current = this.waitingOffsets.get(id);
        if (current != null) {
            current.remove(offset);
        }
    }

    public void clear() {
        this.waitingOffsets.clear();
    }

    public String toString() {
        return MoreObjects.toStringHelper(this.getClass()).add("waitingOffsets", this.waitingOffsets).toString();
    }

    static class OffsetMeta {
        AtomicInteger actions;
        Callback commit;
        final long createdMs = System.currentTimeMillis();

        OffsetMeta(int actions, Callback commit) {
            this.actions = new AtomicInteger(actions);
            this.commit = commit;
        }

        int getActions() {
            return this.actions.get();
        }

        long getMillisAge() {
            return System.currentTimeMillis() - this.createdMs;
        }

        void decrement() {
            this.actions.decrementAndGet();
        }

        public String toString() {
            return "OffsetMeta(actions=" + this.actions + ")";
        }

        @Generated
        public Callback getCommit() {
            return this.commit;
        }

        @Generated
        public long getCreatedMs() {
            return this.createdMs;
        }
    }

    @FunctionalInterface
    public static interface Callback {
        public void apply();
    }
}

