/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.storage;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetUtils;
import org.apache.kafka.connect.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OffsetStorageWriter {
    private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class);
    private final OffsetBackingStore backingStore;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final String namespace;
    private Map<Map<String, Object>, Map<String, Object>> data = new HashMap<Map<String, Object>, Map<String, Object>>();
    private Map<Map<String, Object>, Map<String, Object>> toFlush = null;
    private long currentFlushId = 0L;

    public OffsetStorageWriter(OffsetBackingStore backingStore, String namespace, Converter keyConverter, Converter valueConverter) {
        this.backingStore = backingStore;
        this.namespace = namespace;
        this.keyConverter = keyConverter;
        this.valueConverter = valueConverter;
    }

    public synchronized void offset(Map<String, Object> partition, Map<String, Object> offset) {
        this.data.put(partition, offset);
    }

    private boolean flushing() {
        return this.toFlush != null;
    }

    public synchronized boolean beginFlush() {
        if (this.flushing()) {
            log.error("Invalid call to OffsetStorageWriter flush() while already flushing, the framework should not allow this");
            throw new ConnectException("OffsetStorageWriter is already flushing");
        }
        if (this.data.isEmpty()) {
            return false;
        }
        assert (!this.flushing());
        this.toFlush = this.data;
        this.data = new HashMap<Map<String, Object>, Map<String, Object>>();
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Future<Void> doFlush(Callback<Void> callback) {
        HashMap<ByteBuffer, ByteBuffer> offsetsSerialized;
        long flushId;
        OffsetStorageWriter offsetStorageWriter = this;
        synchronized (offsetStorageWriter) {
            flushId = this.currentFlushId;
            try {
                offsetsSerialized = new HashMap<ByteBuffer, ByteBuffer>(this.toFlush.size());
                for (Map.Entry<Map<String, Object>, Map<String, Object>> entry : this.toFlush.entrySet()) {
                    OffsetUtils.validateFormat(entry.getKey());
                    OffsetUtils.validateFormat(entry.getValue());
                    byte[] key = this.keyConverter.fromConnectData(this.namespace, null, Arrays.asList(this.namespace, entry.getKey()));
                    ByteBuffer keyBuffer = key != null ? ByteBuffer.wrap(key) : null;
                    byte[] value = this.valueConverter.fromConnectData(this.namespace, null, entry.getValue());
                    ByteBuffer valueBuffer = value != null ? ByteBuffer.wrap(value) : null;
                    offsetsSerialized.put(keyBuffer, valueBuffer);
                }
            }
            catch (Throwable t) {
                log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit offsets under namespace {}. This likely won't recover unless the unserializable partition or offset information is overwritten.", (Object)this.namespace);
                log.error("Cause of serialization failure:", t);
                callback.onCompletion(t, null);
                return null;
            }
            log.debug("Submitting {} entries to backing store. The offsets are: {}", (Object)offsetsSerialized.size(), (Object)this.toFlush);
        }
        return this.backingStore.set(offsetsSerialized, (error, result) -> {
            boolean isCurrent = this.handleFinishWrite(flushId, error, (Void)result);
            if (isCurrent && callback != null) {
                callback.onCompletion(error, (Void)result);
            }
        });
    }

    public synchronized void cancelFlush() {
        if (this.flushing()) {
            this.toFlush.putAll(this.data);
            this.data = this.toFlush;
            ++this.currentFlushId;
            this.toFlush = null;
        }
    }

    private synchronized boolean handleFinishWrite(long flushId, Throwable error, Void result) {
        if (flushId != this.currentFlushId) {
            return false;
        }
        if (error != null) {
            this.cancelFlush();
        } else {
            ++this.currentFlushId;
            this.toFlush = null;
        }
        return true;
    }
}

