package io.aiven.kafka.connect.gcs;

import com.github.luben.zstd.ZstdOutputStream;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import io.aiven.kafka.connect.common.config.CompressionType;
import io.aiven.kafka.connect.common.config.FormatType;
import io.aiven.kafka.connect.common.grouper.RecordGrouper;
import io.aiven.kafka.connect.common.grouper.RecordGrouperFactory;
import io.aiven.kafka.connect.common.output.OutputWriter;
import io.aiven.kafka.connect.common.output.jsonwriter.JsonLinesOutputWriter;
import io.aiven.kafka.connect.common.output.jsonwriter.JsonOutputWriter;
import io.aiven.kafka.connect.common.output.plainwriter.PlainOutputWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.zip.GZIPOutputStream;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.SnappyOutputStream;

/* loaded from: input_file:io/aiven/kafka/connect/gcs/GcsSinkTask.class */
public final class GcsSinkTask extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger(GcsSinkConnector.class);
    private RecordGrouper recordGrouper;
    private GcsSinkConfig config;
    private Storage storage;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.aiven.kafka.connect.gcs.GcsSinkTask$1, reason: invalid class name */
    /* loaded from: input_file:io/aiven/kafka/connect/gcs/GcsSinkTask$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$aiven$kafka$connect$common$config$FormatType;
        static final /* synthetic */ int[] $SwitchMap$io$aiven$kafka$connect$common$config$CompressionType = new int[CompressionType.values().length];

        static {
            try {
                $SwitchMap$io$aiven$kafka$connect$common$config$CompressionType[CompressionType.ZSTD.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$aiven$kafka$connect$common$config$CompressionType[CompressionType.GZIP.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$aiven$kafka$connect$common$config$CompressionType[CompressionType.SNAPPY.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$io$aiven$kafka$connect$common$config$FormatType = new int[FormatType.values().length];
            try {
                $SwitchMap$io$aiven$kafka$connect$common$config$FormatType[FormatType.CSV.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$aiven$kafka$connect$common$config$FormatType[FormatType.JSONL.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$aiven$kafka$connect$common$config$FormatType[FormatType.JSON.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    public GcsSinkTask() {
    }

    public GcsSinkTask(Map<String, String> map, Storage storage) {
        Objects.requireNonNull(map, "props cannot be null");
        Objects.requireNonNull(storage, "storage cannot be null");
        this.config = new GcsSinkConfig(map);
        this.storage = storage;
        initRest();
    }

    public void start(Map<String, String> map) {
        Objects.requireNonNull(map, "props cannot be null");
        this.config = new GcsSinkConfig(map);
        this.storage = StorageOptions.newBuilder().setCredentials(this.config.getCredentials()).build().getService();
        initRest();
    }

    private void initRest() {
        try {
            this.recordGrouper = RecordGrouperFactory.newRecordGrouper(this.config);
        } catch (Exception e) {
            throw new ConnectException("Unsupported file name template " + this.config.getFilename(), e);
        }
    }

    private OutputWriter getOutputWriter(OutputStream outputStream) {
        switch (AnonymousClass1.$SwitchMap$io$aiven$kafka$connect$common$config$FormatType[this.config.getFormatType().ordinal()]) {
            case 1:
                return new PlainOutputWriter(this.config.getOutputFields(), outputStream);
            case 2:
                return new JsonLinesOutputWriter(this.config.getOutputFields(), outputStream);
            case 3:
                return new JsonOutputWriter(this.config.getOutputFields(), outputStream);
            default:
                throw new ConnectException("Unsupported format type " + this.config.getFormatType());
        }
    }

    public void put(Collection<SinkRecord> collection) {
        Objects.requireNonNull(collection, "records cannot be null");
        log.debug("Processing {} records", Integer.valueOf(collection.size()));
        Iterator<SinkRecord> it = collection.iterator();
        while (it.hasNext()) {
            this.recordGrouper.put(it.next());
        }
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        this.recordGrouper.records().forEach(this::flushFile);
        this.recordGrouper.clear();
    }

    private void flushFile(String str, List<SinkRecord> list) {
        BlobInfo build = BlobInfo.newBuilder(this.config.getBucketName(), this.config.getPrefix() + str).build();
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                OutputStream compressedStream = getCompressedStream(byteArrayOutputStream);
                try {
                    OutputWriter outputWriter = getOutputWriter(compressedStream);
                    try {
                        Iterator<SinkRecord> it = list.iterator();
                        while (it.hasNext()) {
                            outputWriter.writeRecord(it.next());
                        }
                        if (outputWriter != null) {
                            outputWriter.close();
                        }
                        if (compressedStream != null) {
                            compressedStream.close();
                        }
                        this.storage.create(build, byteArrayOutputStream.toByteArray(), new Storage.BlobTargetOption[0]);
                        byteArrayOutputStream.close();
                    } catch (Throwable th) {
                        if (outputWriter != null) {
                            try {
                                outputWriter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (compressedStream != null) {
                        try {
                            compressedStream.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e) {
            throw new ConnectException(e);
        }
    }

    private OutputStream getCompressedStream(OutputStream outputStream) throws IOException {
        Objects.requireNonNull(outputStream, "outputStream cannot be null");
        switch (AnonymousClass1.$SwitchMap$io$aiven$kafka$connect$common$config$CompressionType[this.config.getCompressionType().ordinal()]) {
            case 1:
                return new ZstdOutputStream(outputStream);
            case 2:
                return new GZIPOutputStream(outputStream);
            case 3:
                return new SnappyOutputStream(outputStream);
            default:
                return outputStream;
        }
    }

    public void stop() {
    }

    public String version() {
        return Version.VERSION;
    }
}
