package com.github.hekonsek.rxjava.connector.kafka;

import com.github.hekonsek.rxjava.event.Event;
import com.google.common.collect.ImmutableMap;
import io.reactivex.Observable;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.kafka.client.consumer.KafkaConsumer;
import java.util.UUID;

/* loaded from: input_file:com/github/hekonsek/rxjava/connector/kafka/KafkaSource.class */
public class KafkaSource<K, V> {
    private final Vertx vertx;
    private final String topic;
    private String bootstrapServers = "localhost:9092";
    private String groupId = UUID.randomUUID().toString();
    private KafkaEventAdapter<K, V> eventAdapter = (KafkaEventAdapter<K, V>) KafkaEventAdapter.stringAndBytesToMap();

    public KafkaSource(Vertx vertx, String str) {
        this.vertx = vertx;
        this.topic = str;
    }

    public Observable<Event<V>> build() {
        return KafkaConsumer.create(this.vertx, ImmutableMap.of("group.id", this.groupId, "bootstrap.servers", this.bootstrapServers, "key.deserializer", this.eventAdapter.getKeyDeserializer().getName(), "value.deserializer", this.eventAdapter.getValueDeserializer().getName(), "auto.offset.reset", "earliest")).subscribe(this.topic).toObservable().map(this.eventAdapter.getMapping());
    }

    public KafkaSource<K, V> bootstrapServers(String str) {
        this.bootstrapServers = str;
        return this;
    }

    public KafkaSource<K, V> groupId(String str) {
        this.groupId = str;
        return this;
    }

    public KafkaSource<K, V> eventAdapter(KafkaEventAdapter<K, V> kafkaEventAdapter) {
        this.eventAdapter = kafkaEventAdapter;
        return this;
    }
}
