package io.debezium.embedded.async;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.config.Instantiator;
import io.debezium.embedded.ConverterBuilder;
import io.debezium.embedded.DebeziumEngineCommon;
import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.embedded.EmbeddedEngineSignaler;
import io.debezium.embedded.EmbeddedWorkerConfig;
import io.debezium.embedded.KafkaConnectUtil;
import io.debezium.embedded.Transformations;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.StopEngineException;
import io.debezium.engine.format.ChangeEventFormat;
import io.debezium.engine.format.KeyValueChangeEventFormat;
import io.debezium.engine.format.KeyValueHeaderChangeEventFormat;
import io.debezium.engine.source.DebeziumSourceConnectorContext;
import io.debezium.engine.source.EngineSourceConnector;
import io.debezium.engine.source.EngineSourceConnectorContext;
import io.debezium.engine.source.EngineSourceTask;
import io.debezium.engine.source.EngineSourceTaskContext;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/embedded/async/AsyncEmbeddedEngine.class */
public final class AsyncEmbeddedEngine<R> implements DebeziumEngine<R>, AsyncEngineConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncEmbeddedEngine.class);
    private final Configuration config;
    private final Clock clock;
    private final ClassLoader classLoader;
    private final Consumer<R> consumer;
    private final DebeziumEngine.ChangeConsumer<R> handler;
    private final DebeziumEngine.CompletionCallback completionCallback;
    private final Optional<DebeziumEngine.ConnectorCallback> connectorCallback;
    private final Converter offsetKeyConverter;
    private final Converter offsetValueConverter;
    private final WorkerConfig workerConfig;
    private final OffsetCommitPolicy offsetCommitPolicy;
    private final EngineSourceConnector connector;
    private final Transformations transformations;
    private final HeaderConverter headerConverter;
    private final Function<SourceRecord, R> recordConverter;
    private final Function<R, SourceRecord> sourceConverter;
    private final ExecutorService taskService;
    private final ExecutorService recordService;
    private DebeziumEngine.Signaler signaler;
    private final AtomicReference<State> state = new AtomicReference<>(State.CREATING);
    private final List<EngineSourceTask> tasks = new ArrayList();
    private final List<Future<Void>> pollingFutures = new ArrayList();
    private final CountDownLatch shutDownLatch = new CountDownLatch(1);

    /* loaded from: input_file:io/debezium/embedded/async/AsyncEmbeddedEngine$AsyncEngineBuilder.class */
    public static final class AsyncEngineBuilder<R> implements DebeziumEngine.Builder<R> {
        private Properties config;
        private Consumer<R> consumer;
        private DebeziumEngine.ChangeConsumer<?> handler;
        private ClassLoader classLoader;
        private Clock clock;
        private DebeziumEngine.CompletionCallback completionCallback;
        private DebeziumEngine.ConnectorCallback connectorCallback;
        private OffsetCommitPolicy offsetCommitPolicy;
        private HeaderConverter headerConverter;
        private Function<SourceRecord, R> recordConverter;
        private ConverterBuilder converterBuilder;

        AsyncEngineBuilder() {
            this((KeyValueHeaderChangeEventFormat<?, ?, ?>) null);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public AsyncEngineBuilder(ChangeEventFormat<?> changeEventFormat) {
            this((KeyValueHeaderChangeEventFormat<?, ?, ?>) KeyValueHeaderChangeEventFormat.of((Class) null, changeEventFormat.getValueFormat(), (Class) null));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public AsyncEngineBuilder(KeyValueChangeEventFormat<?, ?> keyValueChangeEventFormat) {
            this((KeyValueHeaderChangeEventFormat<?, ?, ?>) (keyValueChangeEventFormat instanceof KeyValueHeaderChangeEventFormat ? (KeyValueHeaderChangeEventFormat) keyValueChangeEventFormat : KeyValueHeaderChangeEventFormat.of(keyValueChangeEventFormat.getKeyFormat(), keyValueChangeEventFormat.getValueFormat(), (Class) null)));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public AsyncEngineBuilder(KeyValueHeaderChangeEventFormat<?, ?, ?> keyValueHeaderChangeEventFormat) {
            this.offsetCommitPolicy = null;
            if (keyValueHeaderChangeEventFormat != null) {
                this.converterBuilder = new ConverterBuilder();
                this.converterBuilder.using(keyValueHeaderChangeEventFormat);
            }
        }

        public DebeziumEngine.Builder<R> notifying(Consumer<R> consumer) {
            this.consumer = consumer;
            if (this.config.contains(AsyncEngineConfig.RECORD_PROCESSING_WITH_SERIAL_CONSUMER.name()) && this.config.getProperty(AsyncEngineConfig.RECORD_PROCESSING_WITH_SERIAL_CONSUMER.name()).equalsIgnoreCase("true")) {
                if (this.recordConverter == null) {
                    this.handler = AsyncEmbeddedEngine.buildDefaultChangeConsumer(consumer);
                } else {
                    this.handler = AsyncEmbeddedEngine.buildConvertingChangeConsumer(consumer, this.recordConverter);
                }
            }
            return this;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public DebeziumEngine.Builder<R> notifying(DebeziumEngine.ChangeConsumer<R> changeConsumer) {
            this.handler = changeConsumer;
            if (!this.config.contains(CommonConnectorConfig.TOMBSTONES_ON_DELETE.name()) && !changeConsumer.supportsTombstoneEvents()) {
                AsyncEmbeddedEngine.LOGGER.info("Consumer doesn't support tombstone events, setting '{}' to false.", CommonConnectorConfig.TOMBSTONES_ON_DELETE.name());
                this.config.put(CommonConnectorConfig.TOMBSTONES_ON_DELETE.name(), "false");
            }
            return this;
        }

        public DebeziumEngine.Builder<R> using(Properties properties) {
            this.config = properties;
            if (this.converterBuilder != null) {
                this.converterBuilder.using(properties);
            }
            return this;
        }

        public DebeziumEngine.Builder<R> using(ClassLoader classLoader) {
            this.classLoader = classLoader;
            return this;
        }

        public DebeziumEngine.Builder<R> using(java.time.Clock clock) {
            Objects.requireNonNull(clock);
            this.clock = clock::millis;
            return this;
        }

        public DebeziumEngine.Builder<R> using(DebeziumEngine.CompletionCallback completionCallback) {
            this.completionCallback = completionCallback;
            return this;
        }

        public DebeziumEngine.Builder<R> using(DebeziumEngine.ConnectorCallback connectorCallback) {
            this.connectorCallback = connectorCallback;
            return this;
        }

        public DebeziumEngine.Builder<R> using(OffsetCommitPolicy offsetCommitPolicy) {
            this.offsetCommitPolicy = offsetCommitPolicy;
            return this;
        }

        public DebeziumEngine<R> build() {
            if (this.converterBuilder != null) {
                this.headerConverter = this.converterBuilder.headerConverter();
                this.recordConverter = this.converterBuilder.toFormat(this.headerConverter);
            }
            return new AsyncEmbeddedEngine(this.config, this.consumer, this.handler, this.classLoader, this.clock, this.completionCallback, this.connectorCallback, this.offsetCommitPolicy, this.headerConverter, this.recordConverter);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/debezium/embedded/async/AsyncEmbeddedEngine$ConvertingRecordCommitter.class */
    public class ConvertingRecordCommitter implements DebeziumEngine.RecordCommitter<R> {
        private final SourceRecordCommitter delegate;

        ConvertingRecordCommitter(EngineSourceTask engineSourceTask) {
            this.delegate = new SourceRecordCommitter(engineSourceTask);
        }

        public void markProcessed(R r) throws InterruptedException {
            this.delegate.markProcessed(AsyncEmbeddedEngine.this.sourceConverter.apply(r));
        }

        public void markBatchFinished() throws InterruptedException {
            this.delegate.markBatchFinished();
        }

        public void markProcessed(R r, DebeziumEngine.Offsets offsets) throws InterruptedException {
            this.delegate.markProcessed(AsyncEmbeddedEngine.this.sourceConverter.apply(r), offsets);
        }

        public DebeziumEngine.Offsets buildOffsets() {
            return this.delegate.buildOffsets();
        }
    }

    /* loaded from: input_file:io/debezium/embedded/async/AsyncEmbeddedEngine$DefaultCompletionCallback.class */
    private static class DefaultCompletionCallback implements DebeziumEngine.CompletionCallback {
        private DefaultCompletionCallback() {
        }

        public void handle(boolean z, String str, Throwable th) {
            if (z) {
                return;
            }
            AsyncEmbeddedEngine.LOGGER.error(str, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/debezium/embedded/async/AsyncEmbeddedEngine$PollRecords.class */
    public static class PollRecords extends RetryingCallable<Void> {
        final EngineSourceTask task;
        final RecordProcessor processor;
        final AtomicReference<State> engineState;

        PollRecords(EngineSourceTask engineSourceTask, RecordProcessor recordProcessor, AtomicReference<State> atomicReference) {
            super(Configuration.from(engineSourceTask.context().config()).getInteger(EmbeddedEngineConfig.ERRORS_MAX_RETRIES));
            this.task = engineSourceTask;
            this.processor = recordProcessor;
            this.engineState = atomicReference;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // io.debezium.embedded.async.RetryingCallable
        public Void doCall() throws Exception {
            while (this.engineState.get() == State.POLLING_TASKS) {
                AsyncEmbeddedEngine.LOGGER.trace("Thread {} running task {} starts polling for records.", Thread.currentThread().getName(), this.task.connectTask());
                List<SourceRecord> poll = this.task.connectTask().poll();
                AsyncEmbeddedEngine.LOGGER.trace("Thread {} polled {} records.", Thread.currentThread().getName(), poll == null ? "no" : Integer.valueOf(poll.size()));
                if (poll == null || poll.isEmpty()) {
                    AsyncEmbeddedEngine.LOGGER.trace("No records.");
                } else {
                    this.processor.processRecords(poll);
                }
            }
            return null;
        }

        @Override // io.debezium.embedded.async.RetryingCallable
        public DelayStrategy delayStrategy() {
            Configuration from = Configuration.from(this.task.context().config());
            return DelayStrategy.exponential(Duration.ofMillis(from.getInteger(EmbeddedEngineConfig.ERRORS_RETRY_DELAY_INITIAL_MS)), Duration.ofMillis(from.getInteger(EmbeddedEngineConfig.ERRORS_RETRY_DELAY_MAX_MS)));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/debezium/embedded/async/AsyncEmbeddedEngine$ProcessingCores.class */
    public enum ProcessingCores {
        AVAILABLE_CORES("AVAILABLE_CORES", AsyncEngineConfig.AVAILABLE_CORES);

        private final String coresPlaceholder;
        private final int cores;

        ProcessingCores(String str, int i) {
            this.coresPlaceholder = str;
            this.cores = i;
        }

        public int getCores() {
            return this.cores;
        }

        public static ProcessingCores parse(String str) {
            if (str == null) {
                return null;
            }
            String trim = str.trim();
            for (ProcessingCores processingCores : values()) {
                if (processingCores.coresPlaceholder.equalsIgnoreCase(trim)) {
                    return processingCores;
                }
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/debezium/embedded/async/AsyncEmbeddedEngine$RecordProcessingOrder.class */
    public enum RecordProcessingOrder {
        ORDERED("ORDERED"),
        UNORDERED("UNORDERED");

        private final String orderingPlaceholder;

        RecordProcessingOrder(String str) {
            this.orderingPlaceholder = str;
        }

        public static RecordProcessingOrder parse(String str) {
            if (str == null) {
                return null;
            }
            String trim = str.trim();
            for (RecordProcessingOrder recordProcessingOrder : values()) {
                if (recordProcessingOrder.orderingPlaceholder.equalsIgnoreCase(trim)) {
                    return recordProcessingOrder;
                }
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/debezium/embedded/async/AsyncEmbeddedEngine$SourceRecordCommitter.class */
    public static class SourceRecordCommitter implements DebeziumEngine.RecordCommitter<SourceRecord> {
        final SourceTask task;
        final OffsetStorageWriter offsetWriter;
        final OffsetCommitPolicy offsetCommitPolicy;
        final Clock clock;
        final long commitTimeout;
        private long recordsSinceLastCommit = 0;
        private long timeOfLastCommitMillis = 0;

        SourceRecordCommitter(EngineSourceTask engineSourceTask) {
            this.task = engineSourceTask.connectTask();
            this.offsetWriter = engineSourceTask.context().offsetStorageWriter();
            this.offsetCommitPolicy = engineSourceTask.context().offsetCommitPolicy();
            this.clock = engineSourceTask.context().clock();
            this.commitTimeout = Configuration.from(engineSourceTask.context().config()).getLong(EmbeddedEngineConfig.OFFSET_COMMIT_TIMEOUT_MS);
        }

        public void markProcessed(SourceRecord sourceRecord) throws InterruptedException {
            this.task.commitRecord(sourceRecord);
            this.recordsSinceLastCommit++;
            this.offsetWriter.offset(sourceRecord.sourcePartition(), sourceRecord.sourceOffset());
        }

        public void markBatchFinished() throws InterruptedException {
            if (this.offsetCommitPolicy.performCommit(this.recordsSinceLastCommit, Duration.ofMillis(this.clock.currentTimeInMillis() - this.timeOfLastCommitMillis))) {
                try {
                    if (AsyncEmbeddedEngine.commitOffsets(this.offsetWriter, this.clock, this.commitTimeout, this.task)) {
                        this.recordsSinceLastCommit = 0L;
                        this.timeOfLastCommitMillis = this.clock.currentTimeInMillis();
                    }
                } catch (TimeoutException e) {
                    throw new DebeziumException("Timed out while waiting for committing task offset", e);
                }
            }
        }

        public void markProcessed(SourceRecord sourceRecord, DebeziumEngine.Offsets offsets) throws InterruptedException {
            markProcessed(new SourceRecord(sourceRecord.sourcePartition(), ((DebeziumEngineCommon.SourceRecordOffsets) offsets).getOffsets(), sourceRecord.topic(), sourceRecord.kafkaPartition(), sourceRecord.keySchema(), sourceRecord.key(), sourceRecord.valueSchema(), sourceRecord.value(), sourceRecord.timestamp(), sourceRecord.headers()));
        }

        public DebeziumEngine.Offsets buildOffsets() {
            return new DebeziumEngineCommon.SourceRecordOffsets();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/debezium/embedded/async/AsyncEmbeddedEngine$State.class */
    public enum State {
        CREATING,
        INITIALIZING,
        CREATING_TASKS,
        STARTING_TASKS,
        POLLING_TASKS,
        STOPPING,
        STOPPED;

        public static boolean shouldStopTasks(State state) {
            return STARTING_TASKS.compareTo(state) <= 0;
        }

        public static boolean canBeStopped(State state) {
            return STOPPING.compareTo(state) > 0;
        }
    }

    private AsyncEmbeddedEngine(Properties properties, Consumer<R> consumer, DebeziumEngine.ChangeConsumer<R> changeConsumer, ClassLoader classLoader, Clock clock, DebeziumEngine.CompletionCallback completionCallback, DebeziumEngine.ConnectorCallback connectorCallback, OffsetCommitPolicy offsetCommitPolicy, HeaderConverter headerConverter, Function<SourceRecord, R> function) {
        OffsetCommitPolicy offsetCommitPolicy2;
        this.config = Configuration.from((Properties) Objects.requireNonNull(properties, "A connector configuration must be specified."));
        this.consumer = consumer;
        this.handler = changeConsumer;
        this.classLoader = classLoader == null ? Instantiator.getClassLoader() : classLoader;
        this.clock = clock == null ? Clock.system() : clock;
        this.completionCallback = completionCallback != null ? completionCallback : new DefaultCompletionCallback();
        this.connectorCallback = Optional.ofNullable(connectorCallback);
        this.headerConverter = headerConverter;
        this.recordConverter = function;
        this.sourceConverter = obj -> {
            return ((EmbeddedEngineChangeEvent) obj).sourceRecord();
        };
        if ((this.handler == null) && (this.consumer == null)) {
            throw new DebeziumException("Either java.util.function.Consumer or DebeziumEngine.ChangeConsumer must be specified.");
        }
        if (this.handler == null && RecordProcessingOrder.parse(this.config.getString(AsyncEngineConfig.RECORD_PROCESSING_ORDER)) == null) {
            throw new DebeziumException(String.format("'%s' is not a valid 'record.processing.order' options", this.config.getString(AsyncEngineConfig.RECORD_PROCESSING_ORDER)));
        }
        this.taskService = Executors.newFixedThreadPool(this.config.getInteger("tasks.max", () -> {
            return 1;
        }).intValue());
        String string = this.config.getString(RECORD_PROCESSING_THREADS);
        if (string == null || string.isBlank()) {
            this.recordService = new ThreadPoolExecutor(0, AsyncEngineConfig.AVAILABLE_CORES, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue());
        } else {
            this.recordService = Executors.newFixedThreadPool(computeRecordThreads(string));
        }
        Configuration configuration = this.config;
        Field.Set set = AsyncEngineConfig.CONNECTOR_FIELDS;
        Logger logger = LOGGER;
        Objects.requireNonNull(logger);
        if (!configuration.validateAndRecord(set, logger::error)) {
            DebeziumException debeziumException = new DebeziumException("Failed to start connector with invalid configuration (see logs for actual errors)", (Throwable) null);
            this.completionCallback.handle(false, "Failed to start connector with invalid configuration (see logs for actual errors)", debeziumException);
            throw debeziumException;
        }
        this.workerConfig = new EmbeddedWorkerConfig(this.config.asMap(AsyncEngineConfig.ALL_FIELDS));
        if (offsetCommitPolicy == null) {
            try {
                offsetCommitPolicy2 = (OffsetCommitPolicy) Instantiator.getInstanceWithProperties(this.config.getString(AsyncEngineConfig.OFFSET_COMMIT_POLICY), properties);
            } catch (Throwable th) {
                this.completionCallback.handle(false, "Failed to instantiate required class", th);
                throw new DebeziumException(th);
            }
        } else {
            offsetCommitPolicy2 = offsetCommitPolicy;
        }
        this.offsetCommitPolicy = offsetCommitPolicy2;
        this.offsetKeyConverter = (Converter) Instantiator.getInstance(JsonConverter.class.getName());
        this.offsetValueConverter = (Converter) Instantiator.getInstance(JsonConverter.class.getName());
        this.transformations = new Transformations(Configuration.from(properties));
        this.connector = new EngineSourceConnector((SourceConnector) this.classLoader.loadClass(this.config.getString(AsyncEngineConfig.CONNECTOR_CLASS)).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]));
        Map singletonMap = Collections.singletonMap("schemas.enable", "false");
        this.offsetKeyConverter.configure(singletonMap, true);
        this.offsetValueConverter.configure(singletonMap, false);
    }

    List<EngineSourceTask> tasks() {
        return this.tasks;
    }

    public void run() {
        Throwable th = null;
        try {
            try {
                LOGGER.debug("Initializing connector and starting it.");
                setEngineState(State.CREATING, State.INITIALIZING);
                this.connector.connectConnector().start(initializeConnector());
                LOGGER.debug("Calling connector callback after connector has started.");
                this.connectorCallback.ifPresent((v0) -> {
                    v0.connectorStarted();
                });
                LOGGER.debug("Creating source tasks.");
                setEngineState(State.INITIALIZING, State.CREATING_TASKS);
                createSourceTasks(this.connector, this.tasks);
                LOGGER.debug("Starting source tasks.");
                setEngineState(State.CREATING_TASKS, State.STARTING_TASKS);
                startSourceTasks(this.tasks);
                LOGGER.debug("Starting tasks polling.");
                setEngineState(State.STARTING_TASKS, State.POLLING_TASKS);
                runTasksPolling(this.tasks);
                finishShutDown(null);
            } catch (Throwable th2) {
                th = th2;
                closeEngineWithException(th);
                finishShutDown(th);
            }
        } catch (Throwable th3) {
            finishShutDown(th);
            throw th3;
        }
    }

    public void close() throws IOException {
        LOGGER.debug("Engine shutdown called.");
        State engineState = getEngineState();
        if (engineState == State.STARTING_TASKS) {
            throw new IllegalStateException("Cannot stop engine while tasks are starting, this may lead to leaked resource. Wait for the tasks to be fully started.");
        }
        if (engineState == State.STOPPING) {
            throw new IllegalStateException("Engine is already being shutting down.");
        }
        if (engineState == State.STOPPED) {
            throw new IllegalStateException("Engine has been already shut down.");
        }
        LOGGER.debug("Stopping " + AsyncEmbeddedEngine.class.getName());
        setEngineState(engineState, State.STOPPING);
        close(engineState);
    }

    public void runWithTask(Consumer<SourceTask> consumer) {
        Iterator<EngineSourceTask> it = this.tasks.iterator();
        while (it.hasNext()) {
            consumer.accept(it.next().connectTask());
        }
    }

    private void close(State state) {
        try {
            stopConnector(this.tasks, state);
        } catch (Exception e) {
            LOGGER.warn("Failed to stop connector properly: ", e);
        }
        if (this.headerConverter != null) {
            try {
                this.headerConverter.close();
            } catch (IOException e2) {
                LOGGER.warn("Failed to close header converter: ", e2);
            }
        }
        if (this.transformations != null) {
            try {
                this.transformations.close();
            } catch (IOException e3) {
                LOGGER.warn("Failed to close transformations: ", e3);
            }
        }
        this.shutDownLatch.countDown();
    }

    private void closeEngineWithException(Throwable th) {
        LOGGER.error("Engine has failed with ", th);
        State engineState = getEngineState();
        if (State.canBeStopped(engineState)) {
            LOGGER.debug("Stopping " + AsyncEmbeddedEngine.class.getName());
            setEngineState(engineState, State.STOPPING);
            try {
                close(engineState);
            } catch (Throwable th2) {
                LOGGER.error("Failed to close the engine: ", th2);
            }
        }
    }

    private void finishShutDown(Throwable th) {
        try {
            this.shutDownLatch.await(this.config.getLong(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS), TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            LOGGER.warn("Interrupted while waiting for shutdown to finish.");
        }
        LOGGER.info("Engine is stopped.");
        setEngineState(State.STOPPING, State.STOPPED);
        LOGGER.debug("Calling completion handler.");
        callCompletionHandler(th);
    }

    private Map<String, String> initializeConnector() throws Exception {
        LOGGER.debug("Preparing connector initialization");
        String string = this.config.getString(AsyncEngineConfig.ENGINE_NAME);
        Map<String, String> validateAndGetConnectorConfig = validateAndGetConnectorConfig(this.connector.connectConnector(), this.config.getString(AsyncEngineConfig.CONNECTOR_CLASS));
        LOGGER.debug("Initializing offset store, offset reader and writer");
        OffsetBackingStore createAndStartOffsetStore = createAndStartOffsetStore(validateAndGetConnectorConfig);
        OffsetStorageReaderImpl offsetStorageReaderImpl = new OffsetStorageReaderImpl(createAndStartOffsetStore, string, this.offsetKeyConverter, this.offsetValueConverter);
        OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(createAndStartOffsetStore, string, this.offsetKeyConverter, this.offsetValueConverter);
        LOGGER.debug("Initializing Connect connector itself");
        this.connector.initialize(new EngineSourceConnectorContext(this, createAndStartOffsetStore, offsetStorageReaderImpl, offsetStorageWriter));
        return validateAndGetConnectorConfig;
    }

    private void createSourceTasks(EngineSourceConnector engineSourceConnector, List<EngineSourceTask> list) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
        Class taskClass = engineSourceConnector.connectConnector().taskClass();
        List<Map> taskConfigs = engineSourceConnector.connectConnector().taskConfigs(this.config.getInteger("tasks.max", 1));
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Following task configurations will be used for creating tasks:");
            for (int i = 0; i < taskConfigs.size(); i++) {
                LOGGER.debug("Config #{}: {}", Integer.valueOf(i), taskConfigs.get(i));
            }
        }
        if (taskConfigs.size() < 1) {
            LOGGER.warn("No task configuration provided.");
        } else {
            LOGGER.debug("Creating {} instance(s) of source task(s)", Integer.valueOf(taskConfigs.size()));
        }
        for (Map map : taskConfigs) {
            SourceTask sourceTask = (SourceTask) taskClass.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            EngineSourceTaskContext engineSourceTaskContext = new EngineSourceTaskContext(map, engineSourceConnector.context().offsetStorageReader(), engineSourceConnector.context().offsetStorageWriter(), this.offsetCommitPolicy, this.clock, this.transformations);
            sourceTask.initialize(engineSourceTaskContext);
            list.add(new EngineSourceTask(sourceTask, engineSourceTaskContext));
        }
    }

    private void startSourceTasks(List<EngineSourceTask> list) throws Exception {
        Future poll;
        LOGGER.debug("Starting source connector tasks.");
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(this.taskService);
        for (EngineSourceTask engineSourceTask : list) {
            executorCompletionService.submit(() -> {
                engineSourceTask.connectTask().start(engineSourceTask.context().config());
                return null;
            });
        }
        long j = this.config.getLong(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS);
        LOGGER.info("Waiting max. for {} ms for individual source tasks to start.", Long.valueOf(j));
        int size = list.size();
        Exception exc = null;
        int i = 0;
        for (int i2 = 0; i2 < size; i2++) {
            try {
                poll = executorCompletionService.poll(j, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                LOGGER.debug("Task #{} (out of {} tasks) failed to start. Failed with", new Object[]{Integer.valueOf(i2 + 1), Integer.valueOf(size), e});
                i++;
                if (exc == null) {
                    exc = e;
                }
            }
            if (poll == null) {
                throw new InterruptedException("Time out while waiting for source task to start.");
                break;
            }
            poll.get();
            LOGGER.debug("Started task #{} out of {} tasks.", Integer.valueOf(i2 + 1), Integer.valueOf(size));
            LOGGER.debug("Calling connector callback after task is started.");
            this.connectorCallback.ifPresent((v0) -> {
                v0.taskStarted();
            });
        }
        if (exc != null) {
            LOGGER.error("{} task(s) out of {} failed to start.", Integer.valueOf(i), Integer.valueOf(size));
            throw exc;
        }
        LOGGER.info("All tasks have started successfully.");
    }

    private void runTasksPolling(List<EngineSourceTask> list) throws ExecutionException {
        LOGGER.debug("Starting tasks polling.");
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(this.taskService);
        String selectRecordProcessor = selectRecordProcessor();
        for (EngineSourceTask engineSourceTask : list) {
            RecordProcessor createRecordProcessor = createRecordProcessor(selectRecordProcessor, engineSourceTask);
            createRecordProcessor.initialize(this.recordService, this.transformations);
            this.pollingFutures.add(executorCompletionService.submit(new PollRecords(engineSourceTask, createRecordProcessor, this.state)));
        }
        for (int i = 0; i < list.size(); i++) {
            try {
                executorCompletionService.take().get();
            } catch (InterruptedException | CancellationException e) {
                LOGGER.info("Task interrupted while polling.");
            }
            LOGGER.debug("Task #{} out of {} tasks has stopped polling.", Integer.valueOf(i), Integer.valueOf(list.size()));
        }
    }

    private String selectRecordProcessor() {
        if (this.handler != null && this.recordConverter == null) {
            LOGGER.info("Using {} processor", ParallelSmtBatchProcessor.class.getName());
            return ParallelSmtBatchProcessor.class.getName();
        }
        if (this.handler != null && this.recordConverter != null) {
            LOGGER.info("Using {} processor", ParallelSmtAndConvertBatchProcessor.class.getName());
            return ParallelSmtAndConvertBatchProcessor.class.getName();
        }
        RecordProcessingOrder parse = RecordProcessingOrder.parse(this.config.getString(AsyncEngineConfig.RECORD_PROCESSING_ORDER));
        if (parse == RecordProcessingOrder.ORDERED && this.recordConverter == null) {
            LOGGER.info("Using {} processor", ParallelSmtConsumerProcessor.class.getName());
            return ParallelSmtConsumerProcessor.class.getName();
        }
        if (parse == RecordProcessingOrder.ORDERED && this.recordConverter != null) {
            LOGGER.info("Using {} processor", ParallelSmtAndConvertConsumerProcessor.class.getName());
            return ParallelSmtAndConvertConsumerProcessor.class.getName();
        }
        if (parse == RecordProcessingOrder.UNORDERED && this.recordConverter == null) {
            LOGGER.info("Using {} processor", ParallelSmtAsyncConsumerProcessor.class.getName());
            return ParallelSmtAsyncConsumerProcessor.class.getName();
        }
        if (parse != RecordProcessingOrder.UNORDERED || this.recordConverter == null) {
            throw new IllegalStateException("Unable to select RecordProcessor, this should never happen.");
        }
        LOGGER.info("Using {} processor", ParallelSmtAndConvertAsyncConsumerProcessor.class.getName());
        return ParallelSmtAndConvertAsyncConsumerProcessor.class.getName();
    }

    private RecordProcessor createRecordProcessor(String str, EngineSourceTask engineSourceTask) {
        if (ParallelSmtBatchProcessor.class.getName().equals(str)) {
            return new ParallelSmtBatchProcessor(new SourceRecordCommitter(engineSourceTask), this.handler);
        }
        if (ParallelSmtAndConvertBatchProcessor.class.getName().equals(str)) {
            return new ParallelSmtAndConvertBatchProcessor(new ConvertingRecordCommitter(engineSourceTask), this.handler, this.recordConverter);
        }
        if (ParallelSmtConsumerProcessor.class.getName().equals(str)) {
            return new ParallelSmtConsumerProcessor(new SourceRecordCommitter(engineSourceTask), this.consumer);
        }
        if (ParallelSmtAndConvertConsumerProcessor.class.getName().equals(str)) {
            return new ParallelSmtAndConvertConsumerProcessor(new SourceRecordCommitter(engineSourceTask), this.consumer, this.recordConverter);
        }
        if (ParallelSmtAsyncConsumerProcessor.class.getName().equals(str)) {
            return new ParallelSmtAsyncConsumerProcessor(new SourceRecordCommitter(engineSourceTask), this.consumer);
        }
        if (ParallelSmtAndConvertAsyncConsumerProcessor.class.getName().equals(str)) {
            return new ParallelSmtAndConvertAsyncConsumerProcessor(new SourceRecordCommitter(engineSourceTask), this.consumer, this.recordConverter);
        }
        throw new IllegalStateException("Unable to create RecordProcessor instance, this should never happen.");
    }

    private void stopRecordService() {
        LOGGER.debug("Stopping records service.");
        long j = this.config.getLong(AsyncEngineConfig.RECORD_PROCESSING_SHUTDOWN_TIMEOUT_MS);
        try {
            try {
                this.recordService.shutdown();
                this.recordService.awaitTermination(j, TimeUnit.MILLISECONDS);
                this.recordService.shutdownNow();
            } catch (InterruptedException e) {
                LOGGER.info("Timed out while waiting for record service shutdown. Shutting it down immediately.");
                this.recordService.shutdownNow();
            }
        } catch (Throwable th) {
            this.recordService.shutdownNow();
            throw th;
        }
    }

    private void stopPollingIfNeeded() {
        for (Future<Void> future : this.pollingFutures) {
            if (!future.isDone()) {
                future.cancel(true);
            }
        }
    }

    private void stopSourceTasks(List<EngineSourceTask> list) {
        try {
            try {
                LOGGER.debug("Stopping source connector tasks.");
                ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(this.taskService);
                for (EngineSourceTask engineSourceTask : list) {
                    long j = Configuration.from(engineSourceTask.context().config()).getLong(EmbeddedEngineConfig.OFFSET_COMMIT_TIMEOUT_MS);
                    executorCompletionService.submit(() -> {
                        LOGGER.debug("Committing task's offset.");
                        commitOffsets(engineSourceTask.context().offsetStorageWriter(), engineSourceTask.context().clock(), j, engineSourceTask.connectTask());
                        LOGGER.debug("Stopping Connect task.");
                        engineSourceTask.connectTask().stop();
                        return null;
                    });
                }
                long j2 = this.config.getLong(AsyncEngineConfig.TASK_MANAGEMENT_TIMEOUT_MS);
                LOGGER.debug("Waiting max. for {} ms for individual source tasks to stop.", Long.valueOf(j2));
                int size = list.size();
                long nanoTime = System.nanoTime();
                for (int i = 0; i < size; i++) {
                    Future poll = executorCompletionService.poll(j2, TimeUnit.MILLISECONDS);
                    if (poll == null) {
                        throw new InterruptedException("Time out while waiting for source task to stop.");
                    }
                    poll.get(0L, TimeUnit.MILLISECONDS);
                    LOGGER.info("Stopped task #{} out of {} tasks (it took {} ms to stop the task).", new Object[]{Integer.valueOf(i + 1), Integer.valueOf(size), Long.valueOf((System.nanoTime() - nanoTime) / 1000000)});
                    LOGGER.debug("Calling connector callback after task is stopped.");
                    this.connectorCallback.ifPresent((v0) -> {
                        v0.taskStopped();
                    });
                }
                LOGGER.debug("Stopping all remaining tasks if there are any.");
                this.taskService.shutdown();
                this.taskService.shutdownNow();
            } catch (InterruptedException e) {
                LOGGER.warn("Stopping of the tasks was interrupted, shutting down immediately.");
                this.taskService.shutdownNow();
            } catch (Exception e2) {
                LOGGER.warn("Failure during stopping tasks, stopping them immediately. Failed with ", e2);
                this.taskService.shutdownNow();
            }
        } catch (Throwable th) {
            this.taskService.shutdownNow();
            throw th;
        }
    }

    private void stopOffsetStore(DebeziumSourceConnectorContext debeziumSourceConnectorContext) {
        if (debeziumSourceConnectorContext == null || debeziumSourceConnectorContext.offsetStore() == null) {
            LOGGER.debug("Offset store hasn't been initialized yet, closing of the offset store is skipped.");
            return;
        }
        LOGGER.debug("Stopping offset backing store.");
        try {
            debeziumSourceConnectorContext.offsetStore().stop();
        } catch (Exception e) {
            LOGGER.warn("Failed to stop offset backing store", e);
        }
    }

    private void stopConnector(List<EngineSourceTask> list, State state) {
        if (State.shouldStopTasks(state)) {
            LOGGER.debug("Tasks were already started, stopping record service and tasks.");
            stopRecordService();
            stopPollingIfNeeded();
            stopSourceTasks(list);
        }
        stopOffsetStore(this.connector.context());
        LOGGER.debug("Stopping the connector.");
        this.connector.connectConnector().stop();
        LOGGER.debug("Calling connector callback after connector stop");
        this.connectorCallback.ifPresent((v0) -> {
            v0.connectorStopped();
        });
    }

    private void callCompletionHandler(Throwable th) {
        if (th == null) {
            this.completionCallback.handle(true, String.format("Connector '%s' completed normally.", this.config.getString(AsyncEngineConfig.CONNECTOR_CLASS)), (Throwable) null);
        } else {
            this.completionCallback.handle(false, th.getMessage(), th instanceof ExecutionException ? th.getCause() : th);
        }
    }

    private State getEngineState() {
        return this.state.get();
    }

    private void setEngineState(State state, State state2) {
        if (!this.state.compareAndSet(state, state2)) {
            throw new IllegalStateException(String.format("Cannot change engine state to '%s' as the engine is not in expected state '%s', current engine state is '%s'", state2, state, this.state.get()));
        }
        LOGGER.info("Engine state has changed from '{}' to '{}'", state, state2);
    }

    private Map<String, String> validateAndGetConnectorConfig(SourceConnector sourceConnector, String str) {
        LOGGER.debug("Validating provided connector configuration.");
        Map<String, String> originalsStrings = this.workerConfig.originalsStrings();
        ConfigInfos generateResult = AbstractHerder.generateResult(str, Collections.emptyMap(), sourceConnector.validate(originalsStrings).configValues(), sourceConnector.config().groups());
        if (generateResult.errorCount() > 0) {
            throw new DebeziumException("Connector configuration is not valid. " + ((String) generateResult.values().stream().flatMap(configInfo -> {
                return configInfo.configValue().errors().stream();
            }).collect(Collectors.joining(" "))));
        }
        LOGGER.debug("Connector configuration is valid.");
        return originalsStrings;
    }

    private OffsetBackingStore createAndStartOffsetStore(Map<String, String> map) throws Exception {
        String string = this.config.getString(AsyncEngineConfig.OFFSET_STORAGE);
        LOGGER.debug("Creating instance of offset store for {}.", string);
        MemoryOffsetBackingStore memoryOffsetBackingStore = string.equals(MemoryOffsetBackingStore.class.getName()) ? KafkaConnectUtil.memoryOffsetBackingStore() : string.equals(FileOffsetBackingStore.class.getName()) ? KafkaConnectUtil.fileOffsetBackingStore() : string.equals(KafkaOffsetBackingStore.class.getName()) ? KafkaConnectUtil.kafkaOffsetBackingStore(map) : (OffsetBackingStore) this.classLoader.loadClass(string).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        try {
            LOGGER.debug("Starting offset store.");
            memoryOffsetBackingStore.configure(this.workerConfig);
            memoryOffsetBackingStore.start();
            LOGGER.debug("Offset store {} successfully started.", string);
            return memoryOffsetBackingStore;
        } catch (Throwable th) {
            LOGGER.debug("Failed to start offset store, stopping it now.");
            memoryOffsetBackingStore.stop();
            throw th;
        }
    }

    private static boolean commitOffsets(OffsetStorageWriter offsetStorageWriter, Clock clock, long j, SourceTask sourceTask) throws InterruptedException, TimeoutException {
        long currentTimeInMillis = clock.currentTimeInMillis() + j;
        if (!offsetStorageWriter.beginFlush(j, TimeUnit.MICROSECONDS)) {
            LOGGER.trace("No offset to be committed.");
            return false;
        }
        Future doFlush = offsetStorageWriter.doFlush((th, r2) -> {
        });
        if (doFlush == null) {
            LOGGER.warn("Flushing process probably failed, please check previous log for more details.");
            return false;
        }
        try {
            doFlush.get(Math.max(currentTimeInMillis - clock.currentTimeInMillis(), 0L), TimeUnit.MILLISECONDS);
            sourceTask.commit();
            return true;
        } catch (InterruptedException e) {
            LOGGER.debug("Flush of the offsets interrupted, canceling the flush.");
            offsetStorageWriter.cancelFlush();
            throw e;
        } catch (ExecutionException | TimeoutException e2) {
            LOGGER.warn("Flush of the offsets failed, canceling the flush.", e2);
            offsetStorageWriter.cancelFlush();
            return false;
        }
    }

    public DebeziumEngine.Signaler getSignaler() {
        if (this.signaler == null) {
            this.signaler = new EmbeddedEngineSignaler(tasks().stream().map((v0) -> {
                return v0.signalChannelWriter();
            }).flatMap((v0) -> {
                return v0.stream();
            }).toList());
        }
        return this.signaler;
    }

    private static DebeziumEngine.ChangeConsumer<SourceRecord> buildDefaultChangeConsumer(final Consumer<SourceRecord> consumer) {
        return new DebeziumEngine.ChangeConsumer<SourceRecord>() { // from class: io.debezium.embedded.async.AsyncEmbeddedEngine.1
            public void handleBatch(List<SourceRecord> list, DebeziumEngine.RecordCommitter<SourceRecord> recordCommitter) throws InterruptedException {
                for (SourceRecord sourceRecord : list) {
                    try {
                        consumer.accept(sourceRecord);
                        recordCommitter.markProcessed(sourceRecord);
                    } catch (StopEngineException e) {
                        recordCommitter.markProcessed(sourceRecord);
                        throw e;
                    }
                }
                recordCommitter.markBatchFinished();
            }
        };
    }

    private static DebeziumEngine.ChangeConsumer buildConvertingChangeConsumer(final Consumer consumer, final Function<SourceRecord, ?> function) {
        return new DebeziumEngine.ChangeConsumer<SourceRecord>() { // from class: io.debezium.embedded.async.AsyncEmbeddedEngine.2
            public void handleBatch(List<SourceRecord> list, DebeziumEngine.RecordCommitter<SourceRecord> recordCommitter) throws InterruptedException {
                for (SourceRecord sourceRecord : list) {
                    try {
                        consumer.accept(function.apply(sourceRecord));
                        recordCommitter.markProcessed(sourceRecord);
                    } catch (StopEngineException e) {
                        recordCommitter.markProcessed(sourceRecord);
                        throw e;
                    }
                }
                recordCommitter.markBatchFinished();
            }
        };
    }

    private int computeRecordThreads(String str) {
        ProcessingCores parse = ProcessingCores.parse(str);
        if (parse != null) {
            return parse.getCores();
        }
        int intValue = Integer.valueOf(str).intValue();
        if (intValue <= 0) {
            throw new IllegalArgumentException("Number of cores cannot be negative or zero!");
        }
        return intValue;
    }
}
