/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.file;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileStreamSourceTask
extends SourceTask {
    private static final Logger log = LoggerFactory.getLogger(FileStreamSourceTask.class);
    public static final String FILENAME_FIELD = "filename";
    public static final String POSITION_FIELD = "position";
    private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
    private String filename;
    private InputStream stream;
    private BufferedReader reader = null;
    private char[] buffer;
    private int offset = 0;
    private String topic;
    private int batchSize;
    private Long streamOffset;

    public FileStreamSourceTask() {
        this(1024);
    }

    FileStreamSourceTask(int initialBufferSize) {
        this.buffer = new char[initialBufferSize];
    }

    @Override
    public String version() {
        return new FileStreamSourceConnector().version();
    }

    @Override
    public void start(Map<String, String> props) {
        AbstractConfig config = new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, props);
        this.filename = config.getString("file");
        if (this.filename == null || this.filename.isEmpty()) {
            this.stream = System.in;
            this.streamOffset = null;
            this.reader = new BufferedReader(new InputStreamReader(this.stream, StandardCharsets.UTF_8));
        }
        this.topic = config.getString("topic");
        this.batchSize = config.getInt("batch.size");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        Object lastRecordedOffset2;
        if (this.stream == null) {
            try {
                this.stream = Files.newInputStream(Paths.get(this.filename, new String[0]), new OpenOption[0]);
                Map<String, Object> offset = this.context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, this.filename));
                if (offset != null) {
                    lastRecordedOffset2 = offset.get(POSITION_FIELD);
                    if (lastRecordedOffset2 != null && !(lastRecordedOffset2 instanceof Long)) {
                        throw new ConnectException("Offset position is the incorrect type");
                    }
                    if (lastRecordedOffset2 != null) {
                        long skipped;
                        log.debug("Found previous offset, trying to skip to file offset {}", lastRecordedOffset2);
                        for (long skipLeft = ((Long)lastRecordedOffset2).longValue(); skipLeft > 0L; skipLeft -= skipped) {
                            try {
                                skipped = this.stream.skip(skipLeft);
                                continue;
                            }
                            catch (IOException e) {
                                log.error("Error while trying to seek to previous offset in file {}: ", (Object)this.filename, (Object)e);
                                throw new ConnectException(e);
                            }
                        }
                        log.debug("Skipped to offset {}", lastRecordedOffset2);
                    }
                    this.streamOffset = lastRecordedOffset2 != null ? (Long)lastRecordedOffset2 : 0L;
                } else {
                    this.streamOffset = 0L;
                }
                this.reader = new BufferedReader(new InputStreamReader(this.stream, StandardCharsets.UTF_8));
                log.debug("Opened {} for reading", (Object)this.logFilename());
            }
            catch (NoSuchFileException e) {
                log.warn("Couldn't find file {} for FileStreamSourceTask, sleeping to wait for it to be created", (Object)this.logFilename());
                FileStreamSourceTask lastRecordedOffset2 = this;
                synchronized (lastRecordedOffset2) {
                    this.wait(1000L);
                }
                return null;
            }
            catch (IOException e) {
                log.error("Error while trying to open file {}: ", (Object)this.filename, (Object)e);
                throw new ConnectException(e);
            }
        }
        try {
            BufferedReader readerCopy;
            lastRecordedOffset2 = this;
            synchronized (lastRecordedOffset2) {
                readerCopy = this.reader;
            }
            if (readerCopy == null) {
                return null;
            }
            ArrayList<SourceRecord> records = null;
            int nread = 0;
            while (readerCopy.ready()) {
                String line;
                nread = readerCopy.read(this.buffer, this.offset, this.buffer.length - this.offset);
                log.trace("Read {} bytes from {}", (Object)nread, (Object)this.logFilename());
                if (nread <= 0) continue;
                this.offset += nread;
                boolean foundOneLine = false;
                do {
                    if ((line = this.extractLine()) == null) continue;
                    foundOneLine = true;
                    log.trace("Read a line from {}", (Object)this.logFilename());
                    if (records == null) {
                        records = new ArrayList<SourceRecord>();
                    }
                    records.add(new SourceRecord(this.offsetKey(this.filename), this.offsetValue(this.streamOffset), this.topic, null, null, null, VALUE_SCHEMA, line, System.currentTimeMillis()));
                    if (records.size() < this.batchSize) continue;
                    return records;
                } while (line != null);
                if (foundOneLine || this.offset != this.buffer.length) continue;
                char[] newbuf = new char[this.buffer.length * 2];
                System.arraycopy(this.buffer, 0, newbuf, 0, this.buffer.length);
                log.info("Increased buffer from {} to {}", (Object)this.buffer.length, (Object)newbuf.length);
                this.buffer = newbuf;
            }
            if (nread <= 0) {
                FileStreamSourceTask fileStreamSourceTask = this;
                synchronized (fileStreamSourceTask) {
                    this.wait(1000L);
                }
            }
            return records;
        }
        catch (IOException iOException) {
            return null;
        }
    }

    private String extractLine() {
        int until = -1;
        int newStart = -1;
        for (int i = 0; i < this.offset; ++i) {
            if (this.buffer[i] == '\n') {
                until = i;
                newStart = i + 1;
                break;
            }
            if (this.buffer[i] != '\r') continue;
            if (i + 1 >= this.offset) {
                return null;
            }
            until = i;
            newStart = this.buffer[i + 1] == '\n' ? i + 2 : i + 1;
            break;
        }
        if (until != -1) {
            String result = new String(this.buffer, 0, until);
            System.arraycopy(this.buffer, newStart, this.buffer, 0, this.buffer.length - newStart);
            this.offset -= newStart;
            if (this.streamOffset != null) {
                this.streamOffset = this.streamOffset + (long)newStart;
            }
            return result;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        log.trace("Stopping");
        FileStreamSourceTask fileStreamSourceTask = this;
        synchronized (fileStreamSourceTask) {
            try {
                if (this.stream != null && this.stream != System.in) {
                    this.stream.close();
                    log.trace("Closed input stream");
                }
            }
            catch (IOException e) {
                log.error("Failed to close FileStreamSourceTask stream: ", e);
            }
            this.notify();
        }
    }

    private Map<String, String> offsetKey(String filename) {
        return Collections.singletonMap(FILENAME_FIELD, filename);
    }

    private Map<String, Long> offsetValue(Long pos) {
        return Collections.singletonMap(POSITION_FIELD, pos);
    }

    private String logFilename() {
        return this.filename == null ? "stdin" : this.filename;
    }

    int bufferSize() {
        return this.buffer.length;
    }
}

