package io.streamthoughts.kafka.connect.filepulse.storage;

import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/storage/KafkaStateBackingStore.class */
public class KafkaStateBackingStore<T> implements StateBackingStore<T> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaStateBackingStore.class);
    private static final Duration DEFAULT_READ_TO_END_TIMEOUT = Duration.ofSeconds(30);
    private final KafkaBasedLog<String, byte[]> kafkaLog;
    private final String groupId;
    private final StateSerde<T> serde;
    private final String keyPrefix;
    private final boolean consumerEnabled;
    private StateBackingStore.UpdateListener<T> updateListener;
    private final Object lock = new Object();
    private final AtomicLong offset = new AtomicLong(-1);
    private final Map<String, T> states = new HashMap();
    private volatile Status status = Status.CREATED;

    /* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/storage/KafkaStateBackingStore$ConsumeCallback.class */
    public class ConsumeCallback implements Callback<ConsumerRecord<String, byte[]>> {
        public ConsumeCallback() {
        }

        @Override // io.streamthoughts.kafka.connect.filepulse.storage.Callback
        public void onCompletion(Throwable th, ConsumerRecord<String, byte[]> consumerRecord) {
            if (th != null) {
                KafkaStateBackingStore.LOG.error("Unexpected in consumer callback for KafkaStateBackingStore: ", th);
                return;
            }
            KafkaStateBackingStore.this.offset.set(consumerRecord.offset() + 1);
            byte[] bArr = (byte[]) consumerRecord.value();
            String str = (String) consumerRecord.key();
            if (str == null || !str.startsWith(KafkaStateBackingStore.this.keyPrefix)) {
                KafkaStateBackingStore.LOG.warn("Discarding state update value with invalid key : {}", str);
                return;
            }
            String[] split = str.substring(KafkaStateBackingStore.this.keyPrefix.length()).split("\\.", 2);
            String str2 = split[0];
            String str3 = split[1];
            if (!str2.equals(KafkaStateBackingStore.this.groupId)) {
                KafkaStateBackingStore.LOG.trace("Discarding state update value - not belong to group {} : {}", KafkaStateBackingStore.this.groupId, str);
                return;
            }
            boolean z = false;
            T t = null;
            synchronized (KafkaStateBackingStore.this.lock) {
                if (bArr == null) {
                    KafkaStateBackingStore.LOG.debug("Removed state {} due to null configuration. This is usually intentional and does not indicate an issue.", str3);
                    KafkaStateBackingStore.this.states.remove(str3);
                    z = true;
                } else {
                    try {
                        t = KafkaStateBackingStore.this.serde.deserialize(bArr);
                        KafkaStateBackingStore.LOG.debug("Updating state for name {} : {}", str3, t);
                        KafkaStateBackingStore.this.states.put(str3, t);
                    } catch (Exception e) {
                        KafkaStateBackingStore.LOG.error("Failed to read state : {}", str3, e);
                        return;
                    }
                }
            }
            if (KafkaStateBackingStore.this.status != Status.STARTED || KafkaStateBackingStore.this.updateListener == null) {
                return;
            }
            if (z) {
                KafkaStateBackingStore.this.updateListener.onStateRemove(str3);
            } else {
                KafkaStateBackingStore.this.updateListener.onStateUpdate(str3, t);
            }
        }
    }

    /* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/storage/KafkaStateBackingStore$Status.class */
    public enum Status {
        CREATED,
        STARTED,
        PENDING_SHUTDOWN,
        SHUTDOWN
    }

    public KafkaStateBackingStore(String str, String str2, String str3, Map<String, ?> map, StateSerde<T> stateSerde, boolean z) {
        this.kafkaLog = new KafkaBasedLogFactory(map).make(str, new ConsumeCallback());
        this.groupId = str3;
        this.serde = stateSerde;
        this.keyPrefix = str2;
        this.consumerEnabled = z;
    }

    Status getState() {
        return this.status;
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore
    public synchronized void start() {
        if (isStarted()) {
            throw new IllegalStateException("Cannot init again.");
        }
        LOG.info("Starting {}", getBackingStoreName());
        this.kafkaLog.start(this.consumerEnabled);
        this.status = Status.STARTED;
        LOG.info("Started {}", getBackingStoreName());
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore
    public boolean isStarted() {
        return getState().equals(Status.STARTED);
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore
    public void stop() {
        synchronized (this) {
            LOG.info("Closing {}", getBackingStoreName());
            this.status = Status.PENDING_SHUTDOWN;
            this.kafkaLog.flush();
            this.kafkaLog.stop();
            this.status = Status.SHUTDOWN;
            LOG.info("Closed {}", getBackingStoreName());
        }
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore
    public StateSnapshot<T> snapshot() {
        StateSnapshot<T> stateSnapshot;
        synchronized (this.lock) {
            stateSnapshot = new StateSnapshot<>(this.offset.get(), Collections.unmodifiableMap(this.states));
        }
        return stateSnapshot;
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore
    public boolean contains(String str) {
        boolean containsKey;
        synchronized (this.lock) {
            containsKey = this.states.containsKey(str);
        }
        return containsKey;
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore
    public void putAsync(String str, T t) {
        put(str, t, false);
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore
    public void put(String str, T t) {
        put(str, t, true);
    }

    private void put(String str, T t, boolean z) {
        checkStates();
        try {
            safeSend(str, this.serde.serialize(t));
            mayRefreshState(z);
        } catch (Exception e) {
            LOG.error("Failed to write state to Kafka: ", e);
            throw new RuntimeException("Error writing state to Kafka", e);
        }
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore
    public void removeAsync(String str) {
        remove(str, false);
    }

    private void remove(String str, boolean z) {
        checkStates();
        LOG.debug("Removing state for name {}", str);
        try {
            safeSend(str, null);
            mayRefreshState(z);
        } catch (Exception e) {
            LOG.error("Failed to remove state from Kafka: ", e);
            throw new RuntimeException("Error removing state from Kafka", e);
        }
    }

    private void mayRefreshState(boolean z) {
        if (z) {
            try {
                refresh(DEFAULT_READ_TO_END_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
            } catch (TimeoutException e) {
                LOG.error("Failed to synchronize state from Kafka: TimeoutException");
            }
        }
    }

    private void safeSend(final String str, final byte[] bArr) {
        this.kafkaLog.send(newRecordKey(this.groupId, str), bArr, new org.apache.kafka.clients.producer.Callback() { // from class: io.streamthoughts.kafka.connect.filepulse.storage.KafkaStateBackingStore.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc == null) {
                    return;
                }
                if (!(exc instanceof RetriableException)) {
                    KafkaStateBackingStore.LOG.error("Failed to write state update", exc);
                } else if (bArr == null) {
                    KafkaStateBackingStore.this.kafkaLog.send(str, null, this);
                }
            }
        });
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore
    public void remove(String str) {
        remove(str, true);
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore
    public void refresh(long j, TimeUnit timeUnit) throws TimeoutException {
        checkStates();
        if (!this.consumerEnabled) {
            LOG.warn("This KafkaStateBackingStore is running in producer mode only. Refresh is ignored.");
            return;
        }
        try {
            this.kafkaLog.readToEnd().get(j, timeUnit);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException("Error trying to read to end of log", e);
        }
    }

    @Override // io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore
    public void setUpdateListener(StateBackingStore.UpdateListener<T> updateListener) {
        this.updateListener = updateListener;
    }

    private String getBackingStoreName() {
        return getClass().getSimpleName();
    }

    private synchronized void checkStates() {
        if (this.status == Status.SHUTDOWN || this.status == Status.PENDING_SHUTDOWN) {
            throw new IllegalStateException("Bad state " + getState().name());
        }
    }

    private String newRecordKey(String str, String str2) {
        return this.keyPrefix + str + "." + str2;
    }
}
