package org.fluentd.kafka;

import influent.forward.ForwardCallback;
import influent.forward.ForwardServer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/fluentd/kafka/FluentdSourceTask.class */
public class FluentdSourceTask extends SourceTask {
    private FluentdSourceConnectorConfig config;
    private ForwardServer server;
    private final ConcurrentLinkedDeque<SourceRecord> queue = new ConcurrentLinkedDeque<>();
    static final Logger log = LoggerFactory.getLogger(FluentdSourceTask.class);
    private static final Reporter reporter = new Reporter();

    /* loaded from: input_file:org/fluentd/kafka/FluentdSourceTask$Reporter.class */
    private static final class Reporter implements Runnable {
        private final AtomicLong counter;

        private Reporter() {
            this.counter = new AtomicLong();
        }

        void add(int i) {
            this.counter.addAndGet(i);
        }

        @Override // java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis();
            while (true) {
                try {
                    Thread.sleep(100L);
                    long currentTimeMillis2 = System.currentTimeMillis();
                    if (currentTimeMillis2 - currentTimeMillis >= 1000) {
                        currentTimeMillis = currentTimeMillis2;
                        FluentdSourceTask.log.info("{} requests/sec", Long.valueOf(this.counter.getAndSet(0L)));
                    }
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    public String version() {
        return VersionUtil.getVersion();
    }

    public void start(Map<String, String> map) {
        this.config = new FluentdSourceConnectorConfig(map);
        MessagePackConverver messagePackConverver = new MessagePackConverver(this.config);
        ForwardCallback of = ForwardCallback.of(eventStream -> {
            if (this.config.getFluentdCounterEnabled()) {
                reporter.add(eventStream.getEntries().size());
            }
            eventStream.getEntries().forEach(eventEntry -> {
                String fluentdStaticTopic = this.config.getFluentdStaticTopic();
                String name = eventStream.getTag().getName();
                if (fluentdStaticTopic == null) {
                    fluentdStaticTopic = name;
                }
                this.queue.add(messagePackConverver.convert(fluentdStaticTopic, name, Long.valueOf(eventEntry.getTime().toEpochMilli()), eventEntry));
            });
            return CompletableFuture.completedFuture(null);
        });
        try {
            if (!this.config.getFluentdTransport().equals("tcp") && !this.config.getFluentdTransport().equals("tls")) {
                throw new FluentdConnectorConfigError("fluentd.transport must be \"tcp\" or \"tls\"");
            }
            ForwardServer.Builder keyPassword = new ForwardServer.Builder(of).localAddress(this.config.getLocalAddress()).chunkSizeLimit(this.config.getFluentdChunkSizeLimit().longValue()).backlog(this.config.getFluentdBacklog()).keepAliveEnabled(this.config.getFluentdKeepAliveEnabled()).tcpNoDelayEnabled(this.config.getFluentdTcpNoDeleyEnabled()).sslEnabled(this.config.getFluentdTransport().equals("tls")).tlsVersions((String[]) this.config.getFluentdTlsVersions().toArray(new String[0])).keystorePath(this.config.getFluentdKeystorePath()).keystorePassword(this.config.getFluentdKeystorePassword()).keyPassword(this.config.getFluentdKeyPassword());
            if (this.config.getFluentdSendBufferSize() != 0) {
                keyPassword.sendBufferSize(this.config.getFluentdSendBufferSize());
            }
            if (this.config.getFluentdReceveBufferSize() != 0) {
                keyPassword.receiveBufferSize(this.config.getFluentdReceveBufferSize());
            }
            if (this.config.getFluentdWorkerPoolSize() != 0) {
                keyPassword.workerPoolSize(this.config.getFluentdWorkerPoolSize());
            }
            this.server = keyPassword.build();
            this.server.start();
            if (this.config.getFluentdCounterEnabled()) {
                new Thread(reporter).start();
            }
        } catch (FluentdConnectorConfigError e) {
            throw new ConnectException(e);
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        while (!this.queue.isEmpty()) {
            SourceRecord poll = this.queue.poll();
            log.debug("{}", poll);
            if (poll != null) {
                arrayList.add(poll);
            }
        }
        if (arrayList.isEmpty()) {
            synchronized (this) {
                wait(1000L);
            }
        }
        return arrayList;
    }

    public void stop() {
        this.server.shutdown();
    }
}
