package io.atleon.kafka;

import android.R;
import io.atleon.core.Alo;
import io.atleon.core.AloFlux;
import io.atleon.util.ConfigLoading;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;

/* loaded from: input_file:io/atleon/kafka/AloKafkaSender.class */
public class AloKafkaSender<K, V> {
    public static final String CONFIG_PREFIX = "kafka.sender.";
    public static final String MAX_IN_FLIGHT_PER_SEND_CONFIG = "kafka.sender.max.in.flight.per.send";
    public static final String STOP_ON_ERROR_CONFIG = "stop.on.error";
    private static final int DEFAULT_MAX_IN_FLIGHT_PER_SEND = 256;
    private static final boolean DEFAULT_STOP_ON_ERROR = false;
    private static final Map<String, AtomicLong> COUNTS_BY_CLIENT_ID = new ConcurrentHashMap();
    private static final Map<String, Scheduler> SCHEDULERS_BY_CLIENT_ID = new ConcurrentHashMap();
    private final Mono<KafkaSender<K, V>> futureKafkaSender;

    private AloKafkaSender(KafkaConfigSource kafkaConfigSource) {
        this.futureKafkaSender = ((Mono) kafkaConfigSource.create()).map(AloKafkaSender::newSenderOptions).map(KafkaSender::create).cache();
    }

    public static <K, V> AloKafkaSender<K, V> from(KafkaConfigSource kafkaConfigSource) {
        return new AloKafkaSender<>(kafkaConfigSource);
    }

    public static <V> AloKafkaSender<Object, V> forValues(KafkaConfigSource kafkaConfigSource) {
        return new AloKafkaSender<>(kafkaConfigSource);
    }

    public Flux<KafkaSenderResult<ProducerRecord<K, V>>> sendRecords(Publisher<ProducerRecord<K, V>> publisher) {
        return this.futureKafkaSender.flatMapMany(kafkaSender -> {
            return sendRecords(kafkaSender, publisher);
        });
    }

    public Function<Publisher<V>, Flux<KafkaSenderResult<V>>> sendValues(String str, Function<? super V, ? extends K> function) {
        return publisher -> {
            return sendValues(publisher, str, function);
        };
    }

    public Flux<KafkaSenderResult<V>> sendValues(Publisher<V> publisher, String str, Function<? super V, ? extends K> function) {
        return this.futureKafkaSender.flatMapMany(kafkaSender -> {
            return sendValues(kafkaSender, publisher, obj -> {
                return str;
            }, function);
        });
    }

    public Function<Publisher<V>, Flux<KafkaSenderResult<V>>> sendValues(Function<? super V, String> function, Function<? super V, ? extends K> function2) {
        return publisher -> {
            return sendValues(publisher, function, function2);
        };
    }

    public Flux<KafkaSenderResult<V>> sendValues(Publisher<V> publisher, Function<? super V, String> function, Function<? super V, ? extends K> function2) {
        return this.futureKafkaSender.flatMapMany(kafkaSender -> {
            return sendValues(kafkaSender, publisher, function, function2);
        });
    }

    public Function<Publisher<Alo<V>>, AloFlux<KafkaSenderResult<V>>> sendAloValues(String str, Function<? super V, ? extends K> function) {
        return publisher -> {
            return sendAloValues(publisher, str, function);
        };
    }

    public AloFlux<KafkaSenderResult<V>> sendAloValues(Publisher<Alo<V>> publisher, String str, Function<? super V, ? extends K> function) {
        return sendAloValues(publisher, obj -> {
            return str;
        }, function);
    }

    public Function<Publisher<Alo<V>>, AloFlux<KafkaSenderResult<V>>> sendAloValues(Function<? super V, String> function, Function<? super V, ? extends K> function2) {
        return publisher -> {
            return sendAloValues(publisher, function, function2);
        };
    }

    public AloFlux<KafkaSenderResult<V>> sendAloValues(Publisher<Alo<V>> publisher, Function<? super V, String> function, Function<? super V, ? extends K> function2) {
        return (AloFlux) this.futureKafkaSender.flatMapMany(kafkaSender -> {
            return sendAloValues(kafkaSender, publisher, function, function2);
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        });
    }

    public AloFlux<KafkaSenderResult<ProducerRecord<K, V>>> sendAloRecords(Publisher<Alo<ProducerRecord<K, V>>> publisher) {
        return (AloFlux) this.futureKafkaSender.flatMapMany(kafkaSender -> {
            return sendAloRecords(kafkaSender, publisher);
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        });
    }

    private Flux<KafkaSenderResult<V>> sendValues(KafkaSender<K, V> kafkaSender, Publisher<V> publisher, Function<? super V, String> function, Function<? super V, ? extends K> function2) {
        Flux map = Flux.from(publisher).map(obj -> {
            return createSenderRecordOfValue(obj, function, function2);
        });
        Objects.requireNonNull(kafkaSender);
        return map.transform((v1) -> {
            return r1.send(v1);
        }).map(KafkaSenderResult::fromSenderResult);
    }

    private SenderRecord<K, V, V> createSenderRecordOfValue(V v, Function<? super V, String> function, Function<? super V, ? extends K> function2) {
        return SenderRecord.create(new ProducerRecord(function.apply(v), (Integer) null, function2.apply(v), v), v);
    }

    private Flux<KafkaSenderResult<ProducerRecord<K, V>>> sendRecords(KafkaSender<K, V> kafkaSender, Publisher<ProducerRecord<K, V>> publisher) {
        Flux map = Flux.from(publisher).map(producerRecord -> {
            return SenderRecord.create(producerRecord, producerRecord);
        });
        Objects.requireNonNull(kafkaSender);
        return map.transform((v1) -> {
            return r1.send(v1);
        }).map(KafkaSenderResult::fromSenderResult);
    }

    private Flux<Alo<KafkaSenderResult<V>>> sendAloValues(KafkaSender<K, V> kafkaSender, Publisher<Alo<V>> publisher, Function<? super V, String> function, Function<? super V, ? extends K> function2) {
        Flux map = AloFlux.toFlux(publisher).map(alo -> {
            return createSenderRecordOfAloValue(alo, function, function2);
        });
        Objects.requireNonNull(kafkaSender);
        return map.transform((v1) -> {
            return r1.send(v1);
        }).map(KafkaSenderResult::fromSenderResultOfAlo);
    }

    private SenderRecord<K, V, Alo<V>> createSenderRecordOfAloValue(Alo<V> alo, Function<? super V, String> function, Function<? super V, ? extends K> function2) {
        R.bool boolVar = (Object) alo.get();
        return SenderRecord.create(new ProducerRecord(function.apply(boolVar), (Integer) null, function2.apply(boolVar), boolVar), alo);
    }

    private Flux<Alo<KafkaSenderResult<ProducerRecord<K, V>>>> sendAloRecords(KafkaSender<K, V> kafkaSender, Publisher<Alo<ProducerRecord<K, V>>> publisher) {
        Flux map = AloFlux.toFlux(publisher).map(alo -> {
            return SenderRecord.create((ProducerRecord) alo.get(), alo);
        });
        Objects.requireNonNull(kafkaSender);
        return map.transform((v1) -> {
            return r1.send(v1);
        }).map(KafkaSenderResult::fromSenderResultOfAlo);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static <K, V> SenderOptions<K, V> newSenderOptions(Map<String, Object> map) {
        HashMap hashMap = new HashMap();
        hashMap.put(MAX_IN_FLIGHT_PER_SEND_CONFIG, map.getOrDefault(MAX_IN_FLIGHT_PER_SEND_CONFIG, Integer.valueOf(DEFAULT_MAX_IN_FLIGHT_PER_SEND)));
        hashMap.put(STOP_ON_ERROR_CONFIG, map.getOrDefault(STOP_ON_ERROR_CONFIG, false));
        HashMap hashMap2 = new HashMap(map);
        Set keySet = hashMap.keySet();
        Objects.requireNonNull(hashMap2);
        keySet.forEach((v1) -> {
            r1.remove(v1);
        });
        String str = (String) ConfigLoading.loadOrThrow(map, "client.id", (v0) -> {
            return v0.toString();
        });
        hashMap2.put("client.id", str + "-" + nextClientIdCount(str));
        SenderOptions<K, V> create = SenderOptions.create(hashMap2);
        create.maxInFlight(((Integer) ConfigLoading.loadOrThrow(hashMap, MAX_IN_FLIGHT_PER_SEND_CONFIG, Integer::valueOf)).intValue());
        create.stopOnError(((Boolean) ConfigLoading.loadOrThrow(hashMap, STOP_ON_ERROR_CONFIG, Boolean::valueOf)).booleanValue());
        create.scheduler(schedulerForClient(str));
        return create;
    }

    private static long nextClientIdCount(String str) {
        return COUNTS_BY_CLIENT_ID.computeIfAbsent(str, str2 -> {
            return new AtomicLong();
        }).incrementAndGet();
    }

    private static Scheduler schedulerForClient(String str) {
        return SCHEDULERS_BY_CLIENT_ID.computeIfAbsent(str, AloKafkaSender::newSchedulerForClient);
    }

    private static Scheduler newSchedulerForClient(String str) {
        return Schedulers.newSingle(AloKafkaSender.class.getSimpleName() + "-" + str);
    }
}
