package pcosta.kafka.internal;

import com.google.protobuf.Any;
import com.google.protobuf.Message;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pcosta.kafka.api.MessageFilter;
import pcosta.kafka.api.MessageProducer;
import pcosta.kafka.api.MessagingException;
import pcosta.kafka.message.KafkaMessageProto;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:pcosta/kafka/internal/ProtobufProducer.class */
public final class ProtobufProducer<M> implements MessageProducer<M> {
    private static final Logger log = LoggerFactory.getLogger(ProtobufProducer.class);
    private final Serializer valueSerializer;
    private final Serializer keySerializer;
    private final Collection<MessageFilter> filters;
    private Map<String, KafkaSender<String, M>> kafkaSenders;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProtobufProducer(Serializer serializer, Serializer serializer2) {
        this(serializer, serializer2, Collections.emptyList());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ProtobufProducer(Serializer serializer, Serializer serializer2, Collection<MessageFilter> collection) {
        this.keySerializer = serializer;
        this.valueSerializer = serializer2;
        this.kafkaSenders = new ConcurrentHashMap();
        this.filters = new ArrayList(collection);
    }

    @Override // pcosta.kafka.api.MessageProducer
    public void send(M m, String... strArr) throws MessagingException {
        send(m, null, strArr);
    }

    @Override // pcosta.kafka.api.MessageProducer
    public void send(M m, String str, String... strArr) throws MessagingException {
        Objects.requireNonNull(strArr, "Registered Invalid topics");
        send(m, str, null, strArr);
    }

    @Override // pcosta.kafka.api.MessageProducer
    public void send(M m, String str, String str2, String... strArr) throws MessagingException {
        Objects.requireNonNull(strArr, "Registered Invalid topics");
        log.debug("Transforming object {}", m);
        for (String str3 : strArr) {
            Objects.requireNonNull(str3, "Invalid topic");
            if (null == this.kafkaSenders.get(str3)) {
                synchronized (ProtobufProducer.class) {
                    if (null == this.kafkaSenders.get(str3)) {
                        this.kafkaSenders.put(str3, new KafkaSender<>(str3, this.keySerializer, this.valueSerializer));
                    }
                }
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("sending message to kafka broker:{}topics: {}{}payload: {}{}", new Object[]{System.lineSeparator(), strArr, System.lineSeparator(), m.toString(), System.lineSeparator()});
        }
        for (String str4 : strArr) {
            Class<?> cls = m.getClass();
            String generateKey = null != str ? str : new StringMessageKey(str4, cls.getName()).generateKey();
            KafkaMessageProto.KafkaMessage m204build = KafkaMessageProto.KafkaMessage.newBuilder().setPayloadClass(m.getClass().getName()).setPayload(Any.pack((Message) m)).setTraceabilityId(str2 == null ? "" : str2).m204build();
            if (!isFiltered(str4, cls)) {
                this.kafkaSenders.get(str4).send(generateKey, m204build);
            }
        }
    }

    private boolean isFiltered(String str, Class<?> cls) {
        for (MessageFilter messageFilter : this.filters) {
            if (messageFilter.isEnabled() && messageFilter.filter(str, cls)) {
                log.debug("message {} is not going to be sent to {} due to filter: {}", new Object[]{cls, str, messageFilter.getClass().getSimpleName()});
                return true;
            }
        }
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void close() {
        log.info("Stopping all kafka producers..");
        this.kafkaSenders.values().forEach((v0) -> {
            v0.stop();
        });
    }
}
