package ch.squaredesk.nova.comm.kafka;

import ch.squaredesk.nova.comm.CommAdapterBuilder;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Schedulers;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executors;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ch/squaredesk/nova/comm/kafka/KafkaAdapter.class */
public class KafkaAdapter<InternalMessageType> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaAdapter.class);
    private final KafkaMessageSender<InternalMessageType> messageSender;
    private final KafkaMessageReceiver<InternalMessageType> messageReceiver;

    /* loaded from: input_file:ch/squaredesk/nova/comm/kafka/KafkaAdapter$Builder.class */
    public static class Builder<InternalMessageType> extends CommAdapterBuilder<InternalMessageType, KafkaAdapter<InternalMessageType>> {
        private String serverAddress;
        private String identifier;
        private KafkaMessageSender<InternalMessageType> messageSender;
        private KafkaMessageReceiver<InternalMessageType> messageReceiver;
        private Scheduler subscriptionScheduler;
        private Properties consumerProperties;
        private Properties producerProperties;

        private Builder(Class<InternalMessageType> cls) {
            super(cls);
            this.consumerProperties = new Properties();
            this.producerProperties = new Properties();
        }

        public Builder<InternalMessageType> setConsumerProperties(Properties properties) {
            if (properties != null) {
                this.consumerProperties.putAll(properties);
            }
            return this;
        }

        private Builder<InternalMessageType> addProperty(Properties properties, String str, String str2) {
            Objects.requireNonNull(str, "property key must not be null");
            Objects.requireNonNull(str2, "value for property " + str + " must not be null");
            properties.setProperty(str, str2);
            return this;
        }

        public Builder<InternalMessageType> addConsumerProperty(String str, String str2) {
            return addProperty(this.consumerProperties, str, str2);
        }

        public Builder<InternalMessageType> addProducerProperty(String str, String str2) {
            return addProperty(this.producerProperties, str, str2);
        }

        public Builder<InternalMessageType> setProducerProperties(Properties properties) {
            if (properties != null) {
                this.producerProperties.putAll(properties);
            }
            return this;
        }

        public Builder<InternalMessageType> setServerAddress(String str) {
            this.serverAddress = str;
            return this;
        }

        public Builder<InternalMessageType> setSubscriptionScheduler(Scheduler scheduler) {
            this.subscriptionScheduler = scheduler;
            return this;
        }

        public Builder<InternalMessageType> setIdentifier(String str) {
            this.identifier = str;
            return this;
        }

        public void validate() {
            Objects.requireNonNull(this.serverAddress, "serverAddress must be provided");
            Objects.requireNonNull(this.messageUnmarshaller, "messageUnmarshaller must be provided");
            Objects.requireNonNull(this.messageMarshaller, "messageMarshaller must be provided");
            Objects.requireNonNull(this.metrics, "metrics must be provided");
            if (this.subscriptionScheduler == null) {
                this.subscriptionScheduler = Schedulers.from(Executors.newSingleThreadExecutor(runnable -> {
                    Thread thread = new Thread(runnable, "KafkaSubscription");
                    thread.setDaemon(true);
                    return thread;
                }));
            }
            if (this.consumerProperties == null) {
                this.consumerProperties = new Properties();
            }
            if (this.producerProperties == null) {
                this.producerProperties = new Properties();
            }
        }

        /* renamed from: createInstance, reason: merged with bridge method [inline-methods] */
        public KafkaAdapter<InternalMessageType> m1createInstance() {
            String str = this.identifier == null ? "KafkaAdapter-" + UUID.randomUUID() : this.identifier;
            String str2 = this.identifier == null ? "KafkaAdapter-ReadGroup" : this.identifier + "ReadGroup";
            setPropertyIfNotPresent(this.consumerProperties, "bootstrap.servers", this.serverAddress);
            setPropertyIfNotPresent(this.consumerProperties, "key.deserializer", StringDeserializer.class.getName());
            setPropertyIfNotPresent(this.consumerProperties, "value.deserializer", StringDeserializer.class.getName());
            setPropertyIfNotPresent(this.consumerProperties, "client.id", str);
            setPropertyIfNotPresent(this.consumerProperties, "group.id", str2);
            setPropertyIfNotPresent(this.producerProperties, "bootstrap.servers", this.serverAddress);
            setPropertyIfNotPresent(this.producerProperties, "key.serializer", StringSerializer.class.getName());
            setPropertyIfNotPresent(this.producerProperties, "value.serializer", StringSerializer.class.getName());
            setPropertyIfNotPresent(this.producerProperties, "client.id", str);
            this.messageReceiver = new KafkaMessageReceiver<>(this.identifier, this.consumerProperties, this.messageUnmarshaller, this.metrics);
            this.messageSender = new KafkaMessageSender<>(this.identifier, this.producerProperties, this.messageMarshaller, this.metrics);
            return new KafkaAdapter<>(this.messageSender, this.messageReceiver);
        }

        private static void setPropertyIfNotPresent(Properties properties, String str, String str2) {
            if (properties.containsKey(str)) {
                return;
            }
            properties.setProperty(str, str2);
        }
    }

    KafkaAdapter(KafkaMessageSender<InternalMessageType> kafkaMessageSender, KafkaMessageReceiver<InternalMessageType> kafkaMessageReceiver) {
        this.messageReceiver = kafkaMessageReceiver;
        this.messageSender = kafkaMessageSender;
    }

    public <ConcreteMessageType extends InternalMessageType> Completable sendMessage(String str, ConcreteMessageType concretemessagetype) {
        Objects.requireNonNull(concretemessagetype, "message must not be null");
        return this.messageSender.sendMessage(str, concretemessagetype, new KafkaSpecificInfo());
    }

    public Flowable<InternalMessageType> messages(String str) {
        return this.messageReceiver.messages(str).map(incomingMessage -> {
            return incomingMessage.message;
        });
    }

    public void shutdown() {
        this.messageReceiver.shutdown();
        logger.info("KafkaAdapter shut down");
    }

    public static <InternalMessageType> Builder<InternalMessageType> builder(Class<InternalMessageType> cls) {
        return new Builder<>(cls);
    }
}
