package org.radarbase.producer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.avro.SchemaValidationException;
import org.radarbase.data.AvroRecordData;
import org.radarbase.data.RecordData;
import org.radarbase.topic.AvroTopic;

/* loaded from: input_file:org/radarbase/producer/BatchedKafkaSender.class */
public class BatchedKafkaSender implements KafkaSender {
    private final KafkaSender wrappedSender;
    private final long ageNanos;
    private final int maxBatchSize;

    /* loaded from: input_file:org/radarbase/producer/BatchedKafkaSender$BatchedKafkaTopicSender.class */
    private class BatchedKafkaTopicSender<K, V> implements KafkaTopicSender<K, V> {
        private long nanoAdded;
        private K cachedKey;
        private final List<V> cache;
        private final KafkaTopicSender<K, V> topicSender;
        private final AvroTopic<K, V> topic;

        private BatchedKafkaTopicSender(AvroTopic<K, V> avroTopic) throws IOException, SchemaValidationException {
            this.cache = new ArrayList();
            this.topic = avroTopic;
            this.topicSender = BatchedKafkaSender.this.wrappedSender.sender(avroTopic);
        }

        @Override // org.radarbase.producer.KafkaTopicSender
        public void send(K k, V v) throws IOException, SchemaValidationException {
            if (!BatchedKafkaSender.this.isConnected()) {
                throw new IOException("Cannot send records to unconnected producer.");
            }
            trySend(k, v);
        }

        @Override // org.radarbase.producer.KafkaTopicSender
        public void send(RecordData<K, V> recordData) throws IOException, SchemaValidationException {
            if (recordData.isEmpty()) {
                return;
            }
            K key = recordData.getKey();
            Iterator<V> it = recordData.iterator();
            while (it.hasNext()) {
                trySend(key, it.next());
            }
        }

        private void trySend(K k, V v) throws IOException, SchemaValidationException {
            boolean equals;
            if (this.cache.isEmpty()) {
                this.cachedKey = k;
                this.nanoAdded = System.nanoTime();
                equals = true;
            } else {
                equals = Objects.equals(k, this.cachedKey);
            }
            if (!equals) {
                doSend();
                trySend(k, v);
            } else {
                this.cache.add(v);
                if (exceedsBuffer(this.cache)) {
                    doSend();
                }
            }
        }

        private void doSend() throws IOException, SchemaValidationException {
            this.topicSender.send(new AvroRecordData(this.topic, this.cachedKey, this.cache));
            this.cache.clear();
            this.cachedKey = null;
        }

        @Override // org.radarbase.producer.KafkaTopicSender
        public void clear() {
            this.cache.clear();
            this.topicSender.clear();
        }

        @Override // org.radarbase.producer.KafkaTopicSender
        public void flush() throws IOException {
            if (!this.cache.isEmpty()) {
                try {
                    doSend();
                } catch (SchemaValidationException e) {
                    throw new IOException("Schemas do not match", e);
                }
            }
            this.topicSender.flush();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            try {
                flush();
            } finally {
                BatchedKafkaSender.this.wrappedSender.close();
            }
        }

        private boolean exceedsBuffer(List<?> list) {
            return list.size() >= BatchedKafkaSender.this.maxBatchSize || System.nanoTime() - this.nanoAdded >= BatchedKafkaSender.this.ageNanos;
        }
    }

    public BatchedKafkaSender(KafkaSender kafkaSender, int i, int i2) {
        this.wrappedSender = kafkaSender;
        this.ageNanos = TimeUnit.MILLISECONDS.toNanos(i);
        this.maxBatchSize = i2;
    }

    @Override // org.radarbase.producer.KafkaSender
    public <K, V> KafkaTopicSender<K, V> sender(AvroTopic<K, V> avroTopic) throws IOException, SchemaValidationException {
        return new BatchedKafkaTopicSender(avroTopic);
    }

    @Override // org.radarbase.producer.KafkaSender
    public boolean isConnected() throws AuthenticationException {
        return this.wrappedSender.isConnected();
    }

    @Override // org.radarbase.producer.KafkaSender
    public boolean resetConnection() throws AuthenticationException {
        return this.wrappedSender.resetConnection();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws IOException {
        this.wrappedSender.close();
    }
}
