/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.simple;

import io.debezium.config.Configuration;
import io.debezium.util.Collect;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

public class SimpleSourceConnector
extends SourceConnector {
    protected static final String VERSION = "1.0";
    public static final String TOPIC_NAME = "topic.name";
    public static final String RECORD_COUNT_PER_BATCH = "record.count.per.batch";
    public static final String BATCH_COUNT = "batch.count";
    public static final String DEFAULT_TOPIC_NAME = "simple.topic";
    public static final String INCLUDE_TIMESTAMP = "include.timestamp";
    public static final int DEFAULT_RECORD_COUNT_PER_BATCH = 1;
    public static final int DEFAULT_BATCH_COUNT = 10;
    public static final boolean DEFAULT_INCLUDE_TIMESTAMP = false;
    private Map<String, String> config;

    @Override
    public String version() {
        return VERSION;
    }

    @Override
    public void start(Map<String, String> props) {
        this.config = props;
    }

    @Override
    public Class<? extends Task> taskClass() {
        return SimpleConnectorTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>();
        configs.add(this.config);
        return configs;
    }

    @Override
    public void stop() {
    }

    @Override
    public ConfigDef config() {
        return null;
    }

    public static class SimpleConnectorTask
    extends SourceTask {
        private int recordsPerBatch;
        private Queue<SourceRecord> records;
        private final AtomicBoolean running = new AtomicBoolean();

        @Override
        public String version() {
            return SimpleSourceConnector.VERSION;
        }

        @Override
        public void start(Map<String, String> props) {
            if (this.running.compareAndSet(false, true)) {
                Configuration config = Configuration.from(props);
                this.recordsPerBatch = config.getInteger(SimpleSourceConnector.RECORD_COUNT_PER_BATCH, 1);
                int batchCount = config.getInteger(SimpleSourceConnector.BATCH_COUNT, 10);
                String topic = config.getString(SimpleSourceConnector.TOPIC_NAME, SimpleSourceConnector.DEFAULT_TOPIC_NAME);
                boolean includeTimestamp = config.getBoolean(SimpleSourceConnector.INCLUDE_TIMESTAMP, false);
                Map<String, String> partition = Collect.hashMapOf("source", "simple");
                Schema keySchema = SchemaBuilder.struct().name("simple.key").field("id", Schema.INT32_SCHEMA).build();
                Schema valueSchema = SchemaBuilder.struct().name("simple.value").field("batch", Schema.INT32_SCHEMA).field("record", Schema.INT32_SCHEMA).field("timestamp", Schema.OPTIONAL_INT64_SCHEMA).build();
                Map<String, Object> lastOffset = this.context.offsetStorageReader().offset(partition);
                long lastId = lastOffset == null ? 0L : (Long)lastOffset.get("id");
                this.records = new LinkedList<SourceRecord>();
                long initialTimestamp = System.currentTimeMillis();
                int id = 0;
                for (int batch = 0; batch != batchCount; ++batch) {
                    for (int recordNum = 0; recordNum != this.recordsPerBatch; ++recordNum) {
                        if ((long)(++id) <= lastId) continue;
                        if (!this.running.get()) {
                            return;
                        }
                        Map<String, Integer> offset = Collect.hashMapOf("id", id);
                        Struct key = new Struct(keySchema);
                        key.put("id", (Object)id);
                        Struct value = new Struct(valueSchema);
                        value.put("batch", (Object)(batch + 1));
                        value.put("record", (Object)(recordNum + 1));
                        if (includeTimestamp) {
                            value.put("timestamp", (Object)(initialTimestamp + (long)id));
                        }
                        SourceRecord record = new SourceRecord(partition, offset, topic, 1, keySchema, key, valueSchema, value);
                        this.records.add(record);
                    }
                }
            }
        }

        @Override
        public List<SourceRecord> poll() throws InterruptedException {
            if (this.records.isEmpty()) {
                new CountDownLatch(1).await();
            }
            if (this.running.get()) {
                ArrayList<SourceRecord> results = new ArrayList<SourceRecord>();
                int record = 0;
                while (record < this.recordsPerBatch && !this.records.isEmpty()) {
                    results.add(this.records.poll());
                }
                return results;
            }
            return null;
        }

        @Override
        public void stop() {
            this.running.set(false);
        }
    }
}

