/*
 * Decompiled with CFR 0.152.
 */
package reactor.kafka.sender.internals;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.kafka.sender.Sender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
import reactor.kafka.sender.internals.ProducerFactory;

public class KafkaSender<K, V>
implements Sender<K, V> {
    private static final Logger log = LoggerFactory.getLogger((String)KafkaSender.class.getName());
    private static final Set<String> DELEGATE_METHODS = new HashSet<String>(Arrays.asList("partitionsFor", "metrics", "flush"));
    private final Mono<Producer<K, V>> producerMono;
    private final AtomicBoolean hasProducer = new AtomicBoolean();
    private final SenderOptions<K, V> senderOptions;
    private Producer<K, V> producerProxy;

    public KafkaSender(ProducerFactory producerFactory, SenderOptions<K, V> options) {
        this.senderOptions = options.toImmutable();
        this.producerMono = Mono.fromCallable(() -> producerFactory.createProducer(this.senderOptions)).cache().doOnSubscribe(s -> this.hasProducer.set(true));
    }

    @Override
    public <T> Flux<SenderResult<T>> send(final Publisher<SenderRecord<K, V, T>> records, final boolean delayError) {
        return new Flux<SenderResult<T>>(){

            public void subscribe(Subscriber<? super SenderResult<T>> s) {
                records.subscribe(new SendSubscriber(s, delayError));
            }
        }.doOnError(e -> log.trace("Send failed with exception {}", e)).publishOn(this.senderOptions.scheduler(), this.senderOptions.maxInFlight());
    }

    @Override
    public Mono<Void> send(final Publisher<ProducerRecord<K, V>> records) {
        return new Flux<RecordMetadata>(){

            public void subscribe(Subscriber<? super RecordMetadata> s) {
                records.subscribe((Subscriber)new SendSubscriberNoResponse(s));
            }
        }.doOnError(e -> log.trace("Send failed with exception {}", e)).publishOn(this.senderOptions.scheduler(), this.senderOptions.maxInFlight()).then();
    }

    @Override
    public <T> Mono<T> doOnProducer(Function<Producer<K, V>, ? extends T> function) {
        return this.producerMono.then(producer -> Mono.create((T sink) -> {
            try {
                Object ret = function.apply(this.producerProxy((Producer<K, V>)producer));
                sink.success(ret);
            }
            catch (Throwable t) {
                sink.error(t);
            }
        }));
    }

    @Override
    public void close() {
        if (this.hasProducer.getAndSet(false)) {
            ((Producer)this.producerMono.block()).close(this.senderOptions.closeTimeout().toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private synchronized Producer<K, V> producerProxy(Producer<K, V> producer) {
        if (this.producerProxy == null) {
            Class[] interfaces = new Class[]{Producer.class};
            InvocationHandler handler = (proxy, method, args) -> {
                if (DELEGATE_METHODS.contains(method.getName())) {
                    try {
                        return method.invoke((Object)producer, args);
                    }
                    catch (InvocationTargetException e) {
                        throw e.getCause();
                    }
                }
                throw new UnsupportedOperationException("Method is not supported: " + method);
            };
            this.producerProxy = (Producer)Proxy.newProxyInstance(Producer.class.getClassLoader(), interfaces, handler);
        }
        return this.producerProxy;
    }

    static class Response<T>
    implements SenderResult<T> {
        private final RecordMetadata metadata;
        private final Exception exception;
        private final T correlationMetadata;

        public Response(RecordMetadata metadata, Exception exception, T correlationMetadata) {
            this.metadata = metadata;
            this.exception = exception;
            this.correlationMetadata = correlationMetadata;
        }

        @Override
        public RecordMetadata recordMetadata() {
            return this.metadata;
        }

        @Override
        public Exception exception() {
            return this.exception;
        }

        @Override
        public T correlationMetadata() {
            return this.correlationMetadata;
        }

        public String toString() {
            return String.format("Correlation=%s metadata=%s exception=%s", this.correlationMetadata, this.metadata, this.exception);
        }
    }

    private class SendSubscriberNoResponse
    extends AbstractSendSubscriber<ProducerRecord<K, V>, RecordMetadata, Void> {
        SendSubscriberNoResponse(Subscriber<? super RecordMetadata> actual) {
            super(actual, false);
        }

        @Override
        protected void handleResponse(RecordMetadata metadata, Exception e, Void correlation) {
            if (metadata != null) {
                this.actual.onNext((Object)metadata);
            }
        }

        @Override
        protected Void correlationMetadata(ProducerRecord<K, V> request) {
            return null;
        }

        @Override
        protected ProducerRecord<K, V> producerRecord(ProducerRecord<K, V> request) {
            return request;
        }
    }

    private class SendSubscriber<T>
    extends AbstractSendSubscriber<SenderRecord<K, V, T>, SenderResult<T>, T> {
        SendSubscriber(Subscriber<? super SenderResult<T>> actual, boolean delayError) {
            super(actual, delayError);
        }

        @Override
        protected void handleResponse(RecordMetadata metadata, Exception e, T correlation) {
            this.actual.onNext(new Response<T>(metadata, e, correlation));
        }

        @Override
        protected T correlationMetadata(SenderRecord<K, V, T> request) {
            return request.correlationMetadata();
        }

        @Override
        protected ProducerRecord<K, V> producerRecord(SenderRecord<K, V, T> request) {
            return request.record();
        }
    }

    private abstract class AbstractSendSubscriber<Q, S, C>
    implements Subscriber<Q> {
        protected final Subscriber<? super S> actual;
        private final boolean delayError;
        private Producer<K, V> producer;
        private AtomicInteger inflight = new AtomicInteger();
        private SubscriberState state;
        private AtomicReference<Throwable> firstException = new AtomicReference();

        AbstractSendSubscriber(Subscriber<? super S> actual, boolean delayError) {
            this.actual = actual;
            this.delayError = delayError;
            this.state = SubscriberState.INIT;
        }

        public void onSubscribe(Subscription s) {
            this.state = SubscriberState.ACTIVE;
            this.producer = (Producer)KafkaSender.this.producerMono.block();
            this.actual.onSubscribe(s);
        }

        public void onNext(Q m) {
            if (this.state == SubscriberState.FAILED) {
                return;
            }
            if (this.state == SubscriberState.COMPLETE) {
                Operators.onNextDropped(m);
                return;
            }
            this.inflight.incrementAndGet();
            C correlationMetadata = this.correlationMetadata(m);
            try {
                this.producer.send(this.producerRecord(m), (metadata, exception) -> {
                    boolean complete = this.inflight.decrementAndGet() == 0 && this.state == SubscriberState.OUTBOUND_DONE;
                    try {
                        if (exception == null) {
                            this.handleResponse(metadata, null, correlationMetadata);
                            if (complete) {
                                this.complete();
                            }
                        } else {
                            this.error(metadata, exception, correlationMetadata, complete);
                        }
                    }
                    catch (Exception e) {
                        this.error(metadata, e, correlationMetadata, complete);
                    }
                });
            }
            catch (Exception e) {
                this.inflight.decrementAndGet();
                this.error(null, e, correlationMetadata, true);
            }
        }

        public void onError(Throwable t) {
            if (this.state == SubscriberState.FAILED) {
                return;
            }
            if (this.state == SubscriberState.COMPLETE) {
                Operators.onErrorDropped((Throwable)t);
                return;
            }
            this.state = SubscriberState.FAILED;
            this.actual.onError(t);
        }

        public void onComplete() {
            if (this.state == SubscriberState.COMPLETE) {
                return;
            }
            this.state = SubscriberState.OUTBOUND_DONE;
            if (this.inflight.get() == 0) {
                this.complete();
            }
        }

        private void complete() {
            Throwable exception = this.firstException.getAndSet(null);
            if (this.delayError && exception != null) {
                this.onError(exception);
            } else {
                this.state = SubscriberState.COMPLETE;
                this.actual.onComplete();
            }
        }

        public void error(RecordMetadata metadata, Exception e, C correlation, boolean complete) {
            log.error("error {}", (Throwable)e);
            this.firstException.compareAndSet(null, e);
            this.handleResponse(metadata, e, correlation);
            if (!this.delayError || complete) {
                this.onError(e);
            }
        }

        protected abstract void handleResponse(RecordMetadata var1, Exception var2, C var3);

        protected abstract ProducerRecord<K, V> producerRecord(Q var1);

        protected abstract C correlationMetadata(Q var1);
    }

    private static enum SubscriberState {
        INIT,
        ACTIVE,
        OUTBOUND_DONE,
        COMPLETE,
        FAILED;

    }
}

