/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.kafka010;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.spark.SparkEnv;
import org.apache.spark.SparkEnv$;
import org.apache.spark.internal.Logging;
import org.slf4j.Logger;
import org.spark_project.guava.cache.CacheBuilder;
import org.spark_project.guava.cache.CacheLoader;
import org.spark_project.guava.cache.LoadingCache;
import org.spark_project.guava.cache.RemovalListener;
import org.spark_project.guava.cache.RemovalNotification;
import org.spark_project.guava.util.concurrent.ExecutionError;
import org.spark_project.guava.util.concurrent.UncheckedExecutionException;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.mutable.MapLike;
import scala.math.Ordering;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.java8.JFunction0;
import scala.util.control.NonFatal$;

public final class CachedKafkaProducer$
implements Logging {
    public static CachedKafkaProducer$ MODULE$;
    private long cacheExpireTimeout;
    private LoadingCache<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>> guavaCache;
    private final long defaultCacheExpireTimeout;
    private final CacheLoader<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>> cacheLoader;
    private final RemovalListener<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>> removalListener;
    private transient Logger org$apache$spark$internal$Logging$$log_;
    private volatile byte bitmap$0;

    static {
        new CachedKafkaProducer$();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    private long defaultCacheExpireTimeout() {
        return this.defaultCacheExpireTimeout;
    }

    private long cacheExpireTimeout$lzycompute() {
        CachedKafkaProducer$ cachedKafkaProducer$ = this;
        synchronized (cachedKafkaProducer$) {
            if ((byte)(this.bitmap$0 & 1) == 0) {
                this.cacheExpireTimeout = BoxesRunTime.unboxToLong((Object)Option$.MODULE$.apply((Object)SparkEnv$.MODULE$.get()).map((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToLong((long)CachedKafkaProducer$.$anonfun$cacheExpireTimeout$1(x$1))).getOrElse((Function0)(JFunction0.mcJ.sp & Serializable & scala.Serializable)() -> MODULE$.defaultCacheExpireTimeout()));
                this.bitmap$0 = (byte)(this.bitmap$0 | 1);
            }
        }
        return this.cacheExpireTimeout;
    }

    private long cacheExpireTimeout() {
        return (byte)(this.bitmap$0 & 1) == 0 ? this.cacheExpireTimeout$lzycompute() : this.cacheExpireTimeout;
    }

    private CacheLoader<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>> cacheLoader() {
        return this.cacheLoader;
    }

    private RemovalListener<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>> removalListener() {
        return this.removalListener;
    }

    private LoadingCache<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>> guavaCache$lzycompute() {
        CachedKafkaProducer$ cachedKafkaProducer$ = this;
        synchronized (cachedKafkaProducer$) {
            if ((byte)(this.bitmap$0 & 2) == 0) {
                this.guavaCache = CacheBuilder.newBuilder().expireAfterAccess(this.cacheExpireTimeout(), TimeUnit.MILLISECONDS).removalListener(this.removalListener()).build(this.cacheLoader());
                this.bitmap$0 = (byte)(this.bitmap$0 | 2);
            }
        }
        this.cacheLoader = null;
        this.removalListener = null;
        return this.guavaCache;
    }

    private LoadingCache<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>> guavaCache() {
        return (byte)(this.bitmap$0 & 2) == 0 ? this.guavaCache$lzycompute() : this.guavaCache;
    }

    /*
     * WARNING - void declaration
     */
    public KafkaProducer<byte[], byte[]> org$apache$spark$sql$kafka010$CachedKafkaProducer$$createKafkaProducer(java.util.Map<String, Object> producerConfiguration) {
        void var2_2;
        KafkaProducer kafkaProducer = new KafkaProducer(producerConfiguration);
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(45).append("Created a new instance of KafkaProducer for ").append(producerConfiguration).append(".").toString());
        return var2_2;
    }

    public KafkaProducer<byte[], byte[]> getOrCreate(java.util.Map<String, Object> kafkaParams) {
        KafkaProducer kafkaProducer;
        Seq<Tuple2<String, Object>> paramsSeq = this.paramsToSeq(kafkaParams);
        try {
            kafkaProducer = (KafkaProducer)this.guavaCache().get(paramsSeq);
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            boolean bl = throwable2 instanceof ExecutionException ? true : (throwable2 instanceof UncheckedExecutionException ? true : throwable2 instanceof ExecutionError);
            if (bl && throwable2.getCause() != null) {
                throw throwable2.getCause();
            }
            throw throwable;
        }
        return kafkaProducer;
    }

    /*
     * WARNING - void declaration
     */
    private Seq<Tuple2<String, Object>> paramsToSeq(java.util.Map<String, Object> kafkaParams) {
        void var2_2;
        Seq paramsSeq = (Seq)((MapLike)JavaConverters$.MODULE$.mapAsScalaMapConverter(kafkaParams).asScala()).toSeq().sortBy((Function1 & Serializable & scala.Serializable)x -> (String)x._1(), (Ordering)Ordering.String$.MODULE$);
        return var2_2;
    }

    public void close(java.util.Map<String, Object> kafkaParams) {
        Seq<Tuple2<String, Object>> paramsSeq = this.paramsToSeq(kafkaParams);
        this.guavaCache().invalidate(paramsSeq);
    }

    public void org$apache$spark$sql$kafka010$CachedKafkaProducer$$close(Seq<Tuple2<String, Object>> paramsSeq, KafkaProducer<byte[], byte[]> producer) {
        try {
            this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(40).append("Closing the KafkaProducer with params: ").append(paramsSeq.mkString("\n")).append(".").toString());
            producer.close();
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (!option.isEmpty()) {
                Throwable e = (Throwable)option.get();
                this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Error while closing kafka producer.", e);
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            }
            throw throwable;
        }
    }

    public void clear() {
        this.logInfo((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Cleaning up guava cache.");
        this.guavaCache().invalidateAll();
    }

    private ConcurrentMap<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>> getAsMap() {
        return this.guavaCache().asMap();
    }

    public static final /* synthetic */ long $anonfun$cacheExpireTimeout$1(SparkEnv x$1) {
        return x$1.conf().getTimeAsMs("spark.kafka.producer.cache.timeout", new StringBuilder(2).append(MODULE$.defaultCacheExpireTimeout()).append("ms").toString());
    }

    private CachedKafkaProducer$() {
        MODULE$ = this;
        Logging.$init$((Logging)this);
        this.defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10L);
        this.cacheLoader = new CacheLoader<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>>(){

            public KafkaProducer<byte[], byte[]> load(Seq<Tuple2<String, Object>> config) {
                java.util.Map configMap = (java.util.Map)JavaConverters$.MODULE$.mapAsJavaMapConverter((Map)((TraversableOnce)config.map((Function1 & Serializable & scala.Serializable)x -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(x._1()), x._2()), Seq$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava();
                return CachedKafkaProducer$.MODULE$.org$apache$spark$sql$kafka010$CachedKafkaProducer$$createKafkaProducer(configMap);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$load$1(scala.Tuple2 )}, serializedLambda);
            }
        };
        this.removalListener = new RemovalListener<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>>(){

            public void onRemoval(RemovalNotification<Seq<Tuple2<String, Object>>, KafkaProducer<byte[], byte[]>> notification) {
                Seq paramsSeq = (Seq)notification.getKey();
                KafkaProducer producer = (KafkaProducer)notification.getValue();
                CachedKafkaProducer$.MODULE$.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(42).append("Evicting kafka producer ").append(producer).append(" params: ").append(paramsSeq).append(", due to ").append(notification.getCause()).toString());
                CachedKafkaProducer$.MODULE$.org$apache$spark$sql$kafka010$CachedKafkaProducer$$close((Seq<Tuple2<String, Object>>)paramsSeq, (KafkaProducer<byte[], byte[]>)producer);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$onRemoval$1(org.spark_project.guava.cache.RemovalNotification scala.collection.Seq org.apache.kafka.clients.producer.KafkaProducer )}, serializedLambda);
            }
        };
    }
}

