package org.fluentd.kafka;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.komamitsu.fluency.Fluency;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/fluentd/kafka/FluentdSinkTask.class */
public class FluentdSinkTask extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) FluentdSinkTask.class);
    private Fluency fluency;
    private SinkRecordConverter converter;

    public String version() {
        return VersionUtil.getVersion();
    }

    public void start(Map<String, String> map) {
        FluentdSinkConnectorConfig fluentdSinkConnectorConfig = new FluentdSinkConnectorConfig(map);
        try {
            this.fluency = Fluency.defaultFluency(fluentdSinkConnectorConfig.getFluentdConnectAddresses(), new Fluency.Config().setMaxBufferSize(fluentdSinkConnectorConfig.getFluentdClientMaxBufferSize()).setBufferChunkInitialSize(fluentdSinkConnectorConfig.getFluentdClientBufferChunkInitialSize()).setBufferChunkRetentionSize(fluentdSinkConnectorConfig.getFluentdClientBufferChunkRetentionSize()).setFlushIntervalMillis(fluentdSinkConnectorConfig.getFluentdClientFlushInterval()).setAckResponseMode(fluentdSinkConnectorConfig.getFluentdClientAckResponseMode()).setFileBackupDir(fluentdSinkConnectorConfig.getFluentdClientFileBackupDir()).setWaitUntilBufferFlushed(fluentdSinkConnectorConfig.getFluentdClientWaitUntilBufferFlushed()).setWaitUntilFlusherTerminated(fluentdSinkConnectorConfig.getFluentdClientWaitUntilFlusherTerminated()).setJvmHeapBufferMode(Boolean.valueOf(fluentdSinkConnectorConfig.getFluentdClientJvmHeapBufferMode())));
            this.converter = new SinkRecordConverter(fluentdSinkConnectorConfig);
        } catch (IOException e) {
            throw new ConnectException(e);
        }
    }

    public void put(Collection<SinkRecord> collection) {
        collection.forEach(sinkRecord -> {
            log.debug("key: {}, value: {}, class: {}, schema: {}", sinkRecord.key(), sinkRecord.value(), sinkRecord.value().getClass().getCanonicalName(), sinkRecord.valueSchema());
            FluentdEventRecord convert = this.converter.convert(sinkRecord);
            log.info("{}", convert);
            try {
                if (convert.getEventTime() != null) {
                    this.fluency.emit(convert.getTag(), convert.getEventTime(), convert.getData());
                } else if (convert.getTimestamp() != null) {
                    this.fluency.emit(convert.getTag(), convert.getTimestamp().longValue(), convert.getData());
                } else {
                    this.fluency.emit(convert.getTag(), convert.getData());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        try {
            this.fluency.flush();
        } catch (IOException e) {
            throw new ConnectException(e);
        }
    }

    public void stop() {
        try {
            this.fluency.waitUntilAllBufferFlushed(3);
        } catch (InterruptedException e) {
            throw new ConnectException(e);
        }
    }
}
