/*
 * Decompiled with CFR 0.152.
 */
package io.yupiik.kubernetes.operator.base.spi;

import io.yupiik.kubernetes.operator.base.impl.ObjectLike;
import io.yupiik.kubernetes.operator.base.spi.Operator;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

public abstract class BulkingOperator<T extends ObjectLike>
extends Operator.Base<T> {
    private final int bulkSize;
    private final long timeout;
    private final ScheduledExecutorService scheduler;
    private final Clock clock;
    private final ReentrantLock lock = new ReentrantLock();
    private final List<Event<T>> buffer;
    private ScheduledFuture<?> flushingTask;
    private Instant nextFlush;

    public BulkingOperator(Class<T> resourceType, Operator.DefaultOperatorConfiguration configuration, int bulkSize, long timeout, ScheduledExecutorService scheduler, Clock clock) {
        super(resourceType, configuration);
        this.bulkSize = bulkSize;
        this.timeout = timeout;
        this.scheduler = scheduler;
        this.clock = clock;
        this.buffer = new ArrayList<Event<T>>(bulkSize);
    }

    protected void onBulk(List<Event<T>> events) {
    }

    @Override
    public CompletionStage<?> onStart() {
        this.scheduleFlush(this.clock.instant());
        return super.onStart();
    }

    @Override
    public void onStop() {
        this.lock.lock();
        try {
            if (this.flushingTask != null) {
                this.flushingTask.cancel(true);
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    @Override
    public void onAdd(T resource) {
        this.onEvent(new Event<T>(Type.ADDED, resource));
    }

    @Override
    public void onModify(T resource) {
        this.onEvent(new Event<T>(Type.MODIFIED, resource));
    }

    @Override
    public void onDelete(T resource) {
        this.onEvent(new Event<T>(Type.DELETED, resource));
    }

    private void onEvent(Event<T> event) {
        this.lock.lock();
        try {
            this.buffer.add(event);
        }
        finally {
            this.lock.unlock();
        }
        if (!this.flush()) {
            Instant now = this.clock.instant();
            if (this.nextFlush == null || this.nextFlush.isBefore(now)) {
                this.scheduleFlush(now);
            }
        }
    }

    private void scheduleFlush(Instant now) {
        this.lock.lock();
        try {
            if (this.nextFlush != null && this.nextFlush.isAfter(now)) {
                return;
            }
            if (this.flushingTask != null) {
                this.flushingTask.cancel(false);
            }
            this.flushingTask = this.scheduler.schedule(this::flush, this.timeout, TimeUnit.MILLISECONDS);
            this.nextFlush = this.clock.instant().plusMillis(this.timeout);
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean flush() {
        ArrayList<Event<T>> bulkable = null;
        this.lock.lock();
        try {
            if (this.buffer.size() >= this.bulkSize) {
                bulkable = new ArrayList<Event<T>>(this.buffer);
                this.buffer.clear();
            }
        }
        finally {
            this.lock.unlock();
        }
        if (bulkable == null || bulkable.isEmpty()) {
            return false;
        }
        try {
            this.onBulk(bulkable);
        }
        catch (RuntimeException re) {
            Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, re, re::getMessage);
        }
        finally {
            this.lock.lock();
            try {
                if (this.flushingTask != null) {
                    this.flushingTask.cancel(false);
                    this.flushingTask = null;
                }
            }
            finally {
                this.lock.unlock();
            }
        }
        return true;
    }

    protected record Event<T>(Type type, T resource) {
    }

    protected static enum Type {
        ADDED,
        DELETED,
        MODIFIED;

    }
}

