package io.streamthoughts.kafka.connect.filepulse.source;

import io.streamthoughts.kafka.connect.filepulse.internal.Network;
import io.streamthoughts.kafka.connect.filepulse.offset.OffsetManager;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import java.util.Collections;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/source/KafkaFileStateReporter.class */
public class KafkaFileStateReporter implements StateListener {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaFileStateReporter.class);
    private final StateBackingStore<SourceFile> store;
    private final OffsetManager offsetManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaFileStateReporter(StateBackingStore<SourceFile> stateBackingStore, OffsetManager offsetManager) {
        Objects.requireNonNull(stateBackingStore, "store can't be null");
        Objects.requireNonNull(offsetManager, "offsetManager can't be null");
        this.store = stateBackingStore;
        this.offsetManager = offsetManager;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notify(SourceMetadata sourceMetadata, SourceOffset sourceOffset, SourceStatus sourceStatus) {
        Objects.requireNonNull(sourceMetadata, "metadata can't be null");
        Objects.requireNonNull(sourceOffset, "offset can't be null");
        Objects.requireNonNull(sourceStatus, "status can't be null");
        this.store.putAsync(this.offsetManager.toPartitionJson(sourceMetadata), new SourceFile(sourceMetadata, sourceOffset, sourceStatus, Collections.singletonMap("hostname", Network.HOSTNAME)));
    }

    public void onScheduled(FileContext fileContext) {
        Objects.requireNonNull(fileContext, "context can't be null");
        LOG.debug("Scheduling source file '{}'", fileContext.metadata());
        notify(fileContext.metadata(), fileContext.offset(), SourceStatus.SCHEDULED);
    }

    public void onInvalid(FileContext fileContext) {
        Objects.requireNonNull(fileContext, "context can't be null");
        notify(fileContext.metadata(), fileContext.offset(), SourceStatus.INVALID);
    }

    public void onStart(FileContext fileContext) {
        Objects.requireNonNull(fileContext, "context can't be null");
        LOG.debug("Starting to precess source file '{}'", fileContext.metadata());
        notify(fileContext.metadata(), fileContext.offset(), SourceStatus.STARTED);
    }

    public void onCompleted(FileContext fileContext) {
        Objects.requireNonNull(fileContext, "context can't be null");
        LOG.debug("Completed source file '{}'", fileContext.metadata());
        notify(fileContext.metadata(), fileContext.offset(), SourceStatus.COMPLETED);
    }

    public void onFailure(FileContext fileContext, Throwable th) {
        Objects.requireNonNull(fileContext, "context can't be null");
        LOG.error("Error while processing source file '{}'", fileContext.metadata(), th);
        notify(fileContext.metadata(), fileContext.offset(), SourceStatus.FAILED);
    }
}
