package org.apache.kafka.connect.file;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/file/FileStreamSourceTask.class */
public class FileStreamSourceTask extends SourceTask {
    public static final String FILENAME_FIELD = "filename";
    public static final String POSITION_FIELD = "position";
    private String filename;
    private InputStream stream;
    private BufferedReader reader;
    private char[] buffer;
    private int offset;
    private String topic;
    private int batchSize;
    private Long streamOffset;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) FileStreamSourceTask.class);
    private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;

    public FileStreamSourceTask() {
        this(1024);
    }

    FileStreamSourceTask(int i) {
        this.reader = null;
        this.offset = 0;
        this.buffer = new char[i];
    }

    @Override // org.apache.kafka.connect.connector.Task
    public String version() {
        return new FileStreamSourceConnector().version();
    }

    @Override // org.apache.kafka.connect.source.SourceTask, org.apache.kafka.connect.connector.Task
    public void start(Map<String, String> map) {
        AbstractConfig abstractConfig = new AbstractConfig(FileStreamSourceConnector.CONFIG_DEF, map);
        this.filename = abstractConfig.getString("file");
        if (this.filename == null || this.filename.isEmpty()) {
            this.stream = System.in;
            this.streamOffset = null;
            this.reader = new BufferedReader(new InputStreamReader(this.stream, StandardCharsets.UTF_8));
        }
        this.topic = abstractConfig.getString("topic");
        this.batchSize = abstractConfig.getInt("batch.size").intValue();
    }

    @Override // org.apache.kafka.connect.source.SourceTask
    public List<SourceRecord> poll() throws InterruptedException {
        BufferedReader bufferedReader;
        String extractLine;
        if (this.stream == null) {
            try {
                this.stream = Files.newInputStream(Paths.get(this.filename, new String[0]), new OpenOption[0]);
                Map<String, Object> offset = this.context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, this.filename));
                if (offset != null) {
                    Object obj = offset.get("position");
                    if (obj != null && !(obj instanceof Long)) {
                        throw new ConnectException("Offset position is the incorrect type");
                    }
                    if (obj != null) {
                        log.debug("Found previous offset, trying to skip to file offset {}", obj);
                        long longValue = ((Long) obj).longValue();
                        while (longValue > 0) {
                            try {
                                longValue -= this.stream.skip(longValue);
                            } catch (IOException e) {
                                log.error("Error while trying to seek to previous offset in file {}: ", this.filename, e);
                                throw new ConnectException(e);
                            }
                        }
                        log.debug("Skipped to offset {}", obj);
                    }
                    this.streamOffset = Long.valueOf(obj != null ? ((Long) obj).longValue() : 0L);
                } else {
                    this.streamOffset = 0L;
                }
                this.reader = new BufferedReader(new InputStreamReader(this.stream, StandardCharsets.UTF_8));
                log.debug("Opened {} for reading", logFilename());
            } catch (NoSuchFileException e2) {
                log.warn("Couldn't find file {} for FileStreamSourceTask, sleeping to wait for it to be created", logFilename());
                synchronized (this) {
                    wait(1000L);
                    return null;
                }
            } catch (IOException e3) {
                log.error("Error while trying to open file {}: ", this.filename, e3);
                throw new ConnectException(e3);
            }
        }
        try {
            synchronized (this) {
                bufferedReader = this.reader;
            }
            if (bufferedReader == null) {
                return null;
            }
            ArrayList arrayList = null;
            int i = 0;
            while (bufferedReader.ready()) {
                i = bufferedReader.read(this.buffer, this.offset, this.buffer.length - this.offset);
                log.trace("Read {} bytes from {}", Integer.valueOf(i), logFilename());
                if (i > 0) {
                    this.offset += i;
                    boolean z = false;
                    do {
                        extractLine = extractLine();
                        if (extractLine != null) {
                            z = true;
                            log.trace("Read a line from {}", logFilename());
                            if (arrayList == null) {
                                arrayList = new ArrayList();
                            }
                            arrayList.add(new SourceRecord(offsetKey(this.filename), offsetValue(this.streamOffset), this.topic, null, null, null, VALUE_SCHEMA, extractLine, Long.valueOf(System.currentTimeMillis())));
                            if (arrayList.size() >= this.batchSize) {
                                return arrayList;
                            }
                        }
                    } while (extractLine != null);
                    if (!z && this.offset == this.buffer.length) {
                        char[] cArr = new char[this.buffer.length * 2];
                        System.arraycopy(this.buffer, 0, cArr, 0, this.buffer.length);
                        log.info("Increased buffer from {} to {}", Integer.valueOf(this.buffer.length), Integer.valueOf(cArr.length));
                        this.buffer = cArr;
                    }
                }
            }
            if (i <= 0) {
                synchronized (this) {
                    wait(1000L);
                }
            }
            return arrayList;
        } catch (IOException e4) {
            return null;
        }
    }

    private String extractLine() {
        int i = -1;
        int i2 = -1;
        int i3 = 0;
        while (true) {
            if (i3 >= this.offset) {
                break;
            }
            if (this.buffer[i3] == '\n') {
                i = i3;
                i2 = i3 + 1;
                break;
            }
            if (this.buffer[i3] != '\r') {
                i3++;
            } else {
                if (i3 + 1 >= this.offset) {
                    return null;
                }
                i = i3;
                i2 = this.buffer[i3 + 1] == '\n' ? i3 + 2 : i3 + 1;
            }
        }
        if (i == -1) {
            return null;
        }
        String str = new String(this.buffer, 0, i);
        System.arraycopy(this.buffer, i2, this.buffer, 0, this.buffer.length - i2);
        this.offset -= i2;
        if (this.streamOffset != null) {
            this.streamOffset = Long.valueOf(this.streamOffset.longValue() + i2);
        }
        return str;
    }

    @Override // org.apache.kafka.connect.source.SourceTask, org.apache.kafka.connect.connector.Task
    public void stop() {
        log.trace("Stopping");
        synchronized (this) {
            try {
                if (this.stream != null && this.stream != System.in) {
                    this.stream.close();
                    log.trace("Closed input stream");
                }
            } catch (IOException e) {
                log.error("Failed to close FileStreamSourceTask stream: ", (Throwable) e);
            }
            notify();
        }
    }

    private Map<String, String> offsetKey(String str) {
        return Collections.singletonMap(FILENAME_FIELD, str);
    }

    private Map<String, Long> offsetValue(Long l) {
        return Collections.singletonMap("position", l);
    }

    private String logFilename() {
        return this.filename == null ? "stdin" : this.filename;
    }

    int bufferSize() {
        return this.buffer.length;
    }
}
