package io.debezium.connector.cassandra;

import io.debezium.DebeziumException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.CommitLogReader;
import org.apache.commons.math3.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/cassandra/Cassandra4CommitLogProcessor.class */
public class Cassandra4CommitLogProcessor extends AbstractProcessor {
    private static final String NAME = "Commit Log Processor";
    private final CassandraConnectorContext context;
    private final File cdcDir;
    private AbstractDirectoryWatcher watcher;
    private final List<ChangeEventQueue<Event>> queues;
    private final CommitLogProcessorMetrics metrics;
    private boolean initial;
    private final boolean errorCommitLogReprocessEnabled;
    private final CommitLogTransfer commitLogTransfer;
    private final ExecutorService executorService;
    private static final Logger LOGGER = LoggerFactory.getLogger(Cassandra4CommitLogProcessor.class);
    static final Set<Pair<CommitLogProcessingCallable, Future<ProcessingResult>>> submittedProcessings = ConcurrentHashMap.newKeySet();

    /* loaded from: input_file:io/debezium/connector/cassandra/Cassandra4CommitLogProcessor$CommitLogProcessingCallable.class */
    public static class CommitLogProcessingCallable implements Callable<ProcessingResult> {
        private static final Logger LOGGER = LoggerFactory.getLogger(CommitLogProcessingCallable.class);
        private final LogicalCommitLog commitLog;
        private final List<ChangeEventQueue<Event>> queues;
        private final CommitLogProcessorMetrics metrics;
        private final Cassandra4CommitLogReadHandlerImpl commitLogReadHandler;
        private final CommitLogTransfer commitLogTransfer;
        private final Set<String> erroneousCommitLogs;
        private boolean completePrematurely = false;
        private CommitLogReader commitLogReader = new CommitLogReader();

        public CommitLogProcessingCallable(LogicalCommitLog logicalCommitLog, List<ChangeEventQueue<Event>> list, CommitLogProcessorMetrics commitLogProcessorMetrics, CassandraConnectorContext cassandraConnectorContext) {
            this.commitLog = logicalCommitLog;
            this.queues = list;
            this.metrics = commitLogProcessorMetrics;
            this.commitLogReadHandler = new Cassandra4CommitLogReadHandlerImpl(cassandraConnectorContext.getSchemaHolder(), cassandraConnectorContext.getQueues(), cassandraConnectorContext.getOffsetWriter(), new RecordMaker(cassandraConnectorContext.getCassandraConnectorConfig().tombstonesOnDelete(), new Filters(cassandraConnectorContext.getCassandraConnectorConfig().fieldExcludeList()), cassandraConnectorContext.getCassandraConnectorConfig()), commitLogProcessorMetrics);
            this.commitLogTransfer = cassandraConnectorContext.getCassandraConnectorConfig().getCommitLogTransfer();
            this.erroneousCommitLogs = cassandraConnectorContext.getErroneousCommitLogs();
        }

        public void complete() {
            this.completePrematurely = true;
        }

        private ProcessingResult callInternal() {
            ProcessingResult processingResult;
            if (!this.commitLog.exists()) {
                LOGGER.warn("Commit log " + this.commitLog + " does not exist!");
                return new ProcessingResult(this.commitLog, ProcessingResult.Result.DOES_NOT_EXIST);
            }
            LOGGER.info("Processing commit log {}", this.commitLog.log.toString());
            CommitLogPosition commitLogPosition = new CommitLogPosition(this.commitLog.commitLogSegmentId, 0);
            this.metrics.setCommitLogFilename(this.commitLog.log.toString());
            this.metrics.setCommitLogPosition(commitLogPosition.position);
            try {
                parseIndexFile();
                while (!this.commitLog.completed) {
                    if (this.completePrematurely) {
                        LOGGER.info("{} completed prematurely", this.commitLog.toString());
                        return new ProcessingResult(this.commitLog, ProcessingResult.Result.COMPLETED_PREMATURELY);
                    }
                    Thread.sleep(10000L);
                    parseIndexFile();
                }
                LOGGER.info(this.commitLog.toString());
                try {
                    processCommitLog(this.commitLog, new CommitLogPosition(this.commitLog.commitLogSegmentId, 0));
                    processingResult = new ProcessingResult(this.commitLog);
                } catch (Exception e) {
                    processingResult = new ProcessingResult(this.commitLog, ProcessingResult.Result.ERROR, e);
                }
                LOGGER.info("{}", processingResult);
                return processingResult;
            } catch (Exception e2) {
                LOGGER.error("Processing of {} errored out", this.commitLog.toString());
                return new ProcessingResult(this.commitLog, ProcessingResult.Result.ERROR, e2);
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public ProcessingResult call() {
            ProcessingResult callInternal = callInternal();
            Cassandra4CommitLogProcessor.submittedProcessings.remove(this);
            return callInternal;
        }

        private void processCommitLog(LogicalCommitLog logicalCommitLog, CommitLogPosition commitLogPosition) {
            try {
                try {
                    this.commitLogReader.readCommitLogSegment(this.commitLogReadHandler, logicalCommitLog.log, commitLogPosition, false);
                    this.queues.get(Math.abs(logicalCommitLog.log.getName().hashCode() % this.queues.size())).enqueue(new EOFEvent(logicalCommitLog.log));
                } catch (Exception e) {
                    if (this.commitLogTransfer.getClass().getName().equals("io.debezium.connector.cassandra.BlackHoleCommitLogTransfer")) {
                        throw new DebeziumException(String.format("Error occurred while processing commit log %s", logicalCommitLog.log), e);
                    }
                    Cassandra4CommitLogProcessor.LOGGER.error("Error occurred while processing commit log " + logicalCommitLog.log, e);
                    this.queues.get(Math.abs(logicalCommitLog.log.getName().hashCode() % this.queues.size())).enqueue(new EOFEvent(logicalCommitLog.log));
                    this.erroneousCommitLogs.add(logicalCommitLog.log.getName());
                }
            } catch (InterruptedException e2) {
                throw new CassandraConnectorTaskException(String.format("Enqueuing has been interrupted while enqueuing EOF Event for file %s", logicalCommitLog.log.getName()), e2);
            }
        }

        private void parseIndexFile() throws DebeziumException {
            try {
                this.commitLog.parseCommitLogIndex();
            } catch (DebeziumException e) {
                this.erroneousCommitLogs.add(this.commitLog.log.getName());
                throw e;
            }
        }
    }

    /* loaded from: input_file:io/debezium/connector/cassandra/Cassandra4CommitLogProcessor$LogicalCommitLog.class */
    public static class LogicalCommitLog {
        CommitLogPosition commitLogPosition;
        File log;
        File index;
        long commitLogSegmentId;
        int offsetOfEndOfLastWrittenCDCMutation = 0;
        boolean completed = false;

        public LogicalCommitLog(File file) {
            this.index = file;
            this.log = parseCommitLogName(file);
            this.commitLogSegmentId = parseSegmentId(this.log);
            this.commitLogPosition = new CommitLogPosition(this.commitLogSegmentId, 0);
        }

        public static File parseCommitLogName(File file) {
            return file.toPath().getParent().resolve(file.toPath().getFileName().toString().replace("_cdc.idx", ".log")).toFile();
        }

        public static long parseSegmentId(File file) {
            return Long.parseLong(file.getName().split("-")[2].split("\\.")[0]);
        }

        public boolean exists() {
            return this.log.exists();
        }

        public void parseCommitLogIndex() throws DebeziumException {
            if (this.index.exists()) {
                try {
                    List<String> readAllLines = Files.readAllLines(this.index.toPath(), StandardCharsets.UTF_8);
                    if (readAllLines.isEmpty()) {
                        return;
                    }
                    this.offsetOfEndOfLastWrittenCDCMutation = Integer.parseInt(readAllLines.get(0));
                    if (readAllLines.size() == 2) {
                        this.completed = "COMPLETED".equals(readAllLines.get(1));
                    }
                } catch (Exception e) {
                    throw new DebeziumException(String.format("Unable to parse commit log index file %s", this.index.toPath()), e);
                }
            }
        }

        public String toString() {
            return "LogicalCommitLog{commitLogPosition=" + this.commitLogPosition + ", synced=" + this.offsetOfEndOfLastWrittenCDCMutation + ", completed=" + this.completed + ", log=" + this.log + ", index=" + this.index + ", commitLogSegmentId=" + this.commitLogSegmentId + '}';
        }
    }

    /* loaded from: input_file:io/debezium/connector/cassandra/Cassandra4CommitLogProcessor$ProcessingResult.class */
    public static class ProcessingResult {
        public final LogicalCommitLog commitLog;
        public final Result result;
        public final Exception ex;

        /* loaded from: input_file:io/debezium/connector/cassandra/Cassandra4CommitLogProcessor$ProcessingResult$Result.class */
        public enum Result {
            OK,
            ERROR,
            DOES_NOT_EXIST,
            COMPLETED_PREMATURELY
        }

        public ProcessingResult(LogicalCommitLog logicalCommitLog) {
            this(logicalCommitLog, Result.OK, null);
        }

        public ProcessingResult(LogicalCommitLog logicalCommitLog, Result result) {
            this(logicalCommitLog, result, null);
        }

        public ProcessingResult(LogicalCommitLog logicalCommitLog, Result result, Exception exc) {
            this.commitLog = logicalCommitLog;
            this.result = result;
            this.ex = exc;
        }

        public String toString() {
            return "ProcessingResult{commitLog=" + this.commitLog + ", result=" + this.result + ", ex=" + (this.ex != null ? this.ex.getMessage() : "none") + '}';
        }
    }

    public Cassandra4CommitLogProcessor(CassandraConnectorContext cassandraConnectorContext) {
        super(NAME, Duration.ZERO);
        this.metrics = new CommitLogProcessorMetrics();
        this.initial = true;
        this.context = cassandraConnectorContext;
        this.executorService = Executors.newSingleThreadExecutor();
        this.queues = this.context.getQueues();
        this.commitLogTransfer = this.context.getCassandraConnectorConfig().getCommitLogTransfer();
        this.errorCommitLogReprocessEnabled = this.context.getCassandraConnectorConfig().errorCommitLogReprocessEnabled();
        this.cdcDir = new File(DatabaseDescriptor.getCDCLogLocation());
    }

    public void initialize() {
        this.metrics.registerMetrics();
    }

    public void destroy() {
        this.metrics.unregisterMetrics();
    }

    public void stop() {
        try {
            this.executorService.shutdown();
            for (Pair<CommitLogProcessingCallable, Future<ProcessingResult>> pair : submittedProcessings) {
                try {
                    ((CommitLogProcessingCallable) pair.getFirst()).complete();
                    ((Future) pair.getSecond()).get();
                } catch (Exception e) {
                    LOGGER.warn("Waiting for submitted task to finish has failed.");
                }
            }
            super.stop();
        } catch (Exception e2) {
            throw new RuntimeException("Unable to close executor service in CommitLogProcessor in a timely manner");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void submit(Path path) {
        CommitLogProcessingCallable commitLogProcessingCallable = new CommitLogProcessingCallable(new LogicalCommitLog(path.toFile()), this.queues, this.metrics, this.context);
        submittedProcessings.add(new Pair<>(commitLogProcessingCallable, this.executorService.submit(commitLogProcessingCallable)));
    }

    public boolean isRunning() {
        return (!super.isRunning() || this.executorService.isShutdown() || this.executorService.isTerminated()) ? false : true;
    }

    public void process() throws IOException, InterruptedException {
        LOGGER.debug("Processing commitLogFiles while initial is {}", Boolean.valueOf(this.initial));
        if (this.watcher == null) {
            this.watcher = new AbstractDirectoryWatcher(this.cdcDir.toPath(), this.context.getCassandraConnectorConfig().cdcDirPollInterval(), Collections.singleton(StandardWatchEventKinds.ENTRY_CREATE)) { // from class: io.debezium.connector.cassandra.Cassandra4CommitLogProcessor.1
                void handleEvent(WatchEvent<?> watchEvent, Path path) {
                    if (Cassandra4CommitLogProcessor.this.isRunning() && path.getFileName().toString().endsWith("_cdc.idx")) {
                        Cassandra4CommitLogProcessor.this.submit(path);
                    }
                }
            };
        }
        if (this.initial) {
            LOGGER.info("Reading existing commit logs in {}", this.cdcDir);
            File[] indexes = CommitLogUtil.getIndexes(this.cdcDir);
            Arrays.sort(indexes, CommitLogUtil::compareCommitLogsIndexes);
            for (File file : indexes) {
                if (isRunning()) {
                    submit(file.toPath());
                }
            }
            if (this.errorCommitLogReprocessEnabled) {
                LOGGER.info("CommitLog Error Processing is enabled. Attempting to get all error commitLog files for re-processing.");
                this.commitLogTransfer.getErrorCommitLogFiles();
            }
            this.initial = false;
        }
        this.watcher.poll();
    }
}
