package pcosta.kafka.internal;

import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pcosta.kafka.api.MessageFilter;
import pcosta.kafka.api.MessageProducer;
import pcosta.kafka.api.MessageReceiverConfiguration;
import pcosta.kafka.api.MessagingContext;
import pcosta.kafka.api.MessagingException;

/* loaded from: input_file:pcosta/kafka/internal/KafkaContext.class */
class KafkaContext implements MessagingContext {
    private static final Logger log = LoggerFactory.getLogger(KafkaContext.class);
    private MessageReceiver receiver;
    private Map<String, ProtobufProducer> producers = new ConcurrentHashMap();

    @Override // pcosta.kafka.api.MessagingContext
    public void createReceiver(MessageReceiverConfiguration messageReceiverConfiguration) throws MessagingException {
        if (this.receiver != null) {
            throw new IllegalStateException("Receiver has been created already");
        }
        this.receiver = new MessageReceiver();
        this.receiver.registerErrorListener(messageReceiverConfiguration.getErrorListener());
        messageReceiverConfiguration.getListeners().forEach(messageListenerConfiguration -> {
            this.receiver.registerListener(messageListenerConfiguration);
        });
        initializeMessageReceiver();
    }

    @Override // pcosta.kafka.api.MessagingContext
    public <M> MessageProducer<M> createProducer(String str, Serializer serializer, Serializer serializer2) throws MessagingException {
        ProtobufProducer protobufProducer = new ProtobufProducer(serializer, serializer2);
        ProtobufProducer putIfAbsent = this.producers.putIfAbsent(str, protobufProducer);
        return Objects.isNull(putIfAbsent) ? protobufProducer : putIfAbsent;
    }

    @Override // pcosta.kafka.api.MessagingContext
    public <M> MessageProducer<M> createProducer(String str, Serializer serializer, Serializer serializer2, Collection<MessageFilter> collection) throws MessagingException {
        ProtobufProducer protobufProducer = new ProtobufProducer(serializer, serializer2, collection);
        ProtobufProducer putIfAbsent = this.producers.putIfAbsent(str, protobufProducer);
        return Objects.isNull(putIfAbsent) ? protobufProducer : putIfAbsent;
    }

    @Override // pcosta.kafka.api.MessagingContext
    public void shutdown() throws MessagingException {
        log.info("kafka context is shutting down..");
        if (this.receiver != null) {
            this.receiver.close();
            this.receiver = null;
        }
        this.producers.forEach((str, protobufProducer) -> {
            protobufProducer.close();
        });
    }

    private void initializeMessageReceiver() {
        this.receiver.start();
    }
}
