package onextent.akka.eventhubs;

import akka.stream.stage.AbstractInHandler;
import akka.stream.stage.GraphStageLogic;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import scala.collection.JavaConverters$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.sys.package$;

/* compiled from: EventhubsBatchSink.scala */
/* loaded from: input_file:onextent/akka/eventhubs/EventhubsBatchSink$$anon$1.class */
public final class EventhubsBatchSink$$anon$1 extends GraphStageLogic {
    private final /* synthetic */ EventhubsBatchSink $outer;

    public void onextent$akka$eventhubs$EventhubsBatchSink$$anon$$reConnect() {
        if (this.$outer.logger().underlying().isWarnEnabled()) {
            this.$outer.logger().underlying().warn("reconnecting sync");
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        this.$outer.ehClient().closeSync();
        this.$outer.ehClient_$eq(EventHubClient.createSync(this.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$eventhubsConfig.connStr(), this.$outer.executorService()));
    }

    public void preStart() {
        if (this.$outer.logger().underlying().isInfoEnabled()) {
            this.$outer.logger().underlying().info("starting eventhubs sink");
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        pull(this.$outer.in());
        super.preStart();
    }

    public /* synthetic */ EventhubsBatchSink onextent$akka$eventhubs$EventhubsBatchSink$$anon$$$outer() {
        return this.$outer;
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public EventhubsBatchSink$$anon$1(EventhubsBatchSink eventhubsBatchSink) {
        super(eventhubsBatchSink.m11shape());
        if (eventhubsBatchSink == null) {
            throw null;
        }
        this.$outer = eventhubsBatchSink;
        setHandler(eventhubsBatchSink.in(), new AbstractInHandler(this) { // from class: onextent.akka.eventhubs.EventhubsBatchSink$$anon$1$$anon$2
            private long count;
            private final /* synthetic */ EventhubsBatchSink$$anon$1 $outer;

            public long count() {
                return this.count;
            }

            public void count_$eq(long j) {
                this.count = j;
            }

            public void onPush() {
                Seq seq = (Seq) this.$outer.grab(this.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$anon$$$outer().in());
                try {
                    this.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$anon$$$outer().ehClient().sendSync((Iterable) JavaConverters$.MODULE$.seqAsJavaListConverter((Seq) seq.map(eventhubsSinkData -> {
                        EventData create = EventData.create(eventhubsSinkData.payload());
                        eventhubsSinkData.props().fold(() -> {
                        }, map -> {
                            $anonfun$onPush$3(create, map);
                            return BoxedUnit.UNIT;
                        });
                        return create;
                    })).asJava());
                    seq.foreach(eventhubsSinkData2 -> {
                        $anonfun$onPush$5(this, eventhubsSinkData2);
                        return BoxedUnit.UNIT;
                    });
                    count_$eq(count() + 1);
                } catch (EventHubException e) {
                    if (this.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$anon$$$outer().logger().underlying().isErrorEnabled()) {
                        this.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$anon$$$outer().logger().underlying().error(new StringBuilder(21).append("eventhub ").append(this.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$anon$$$outer().onextent$akka$eventhubs$EventhubsBatchSink$$partitionId).append(" exception: ").append(e.getMessage()).toString(), e);
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    } else {
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    }
                    Object orElse = package$.MODULE$.env().getOrElse("AKKA_EH_DIE_ON_ERROR", () -> {
                        return "";
                    });
                    if (orElse != null ? orElse.equals("YES") : "YES" == 0) {
                        if (this.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$anon$$$outer().logger().underlying().isErrorEnabled()) {
                            this.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$anon$$$outer().logger().underlying().error("FATAL ERROR 3 - ABORT", e);
                            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                        } else {
                            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                        }
                        System.exit(1);
                    }
                    this.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$anon$$reConnect();
                } catch (Throwable th) {
                    if (this.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$anon$$$outer().logger().underlying().isErrorEnabled()) {
                        this.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$anon$$$outer().logger().underlying().error(new StringBuilder(22).append("eventhub ").append(this.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$anon$$$outer().onextent$akka$eventhubs$EventhubsBatchSink$$partitionId).append(" unexpected: ").append(th.getMessage()).toString(), th);
                        BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
                    } else {
                        BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
                    }
                    Object orElse2 = package$.MODULE$.env().getOrElse("AKKA_EH_DIE_ON_ERROR", () -> {
                        return "";
                    });
                    if (orElse2 != null ? orElse2.equals("YES") : "YES" == 0) {
                        if (this.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$anon$$$outer().logger().underlying().isErrorEnabled()) {
                            this.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$anon$$$outer().logger().underlying().error("FATAL ERROR 4 - ABORT", th);
                            BoxedUnit boxedUnit7 = BoxedUnit.UNIT;
                        } else {
                            BoxedUnit boxedUnit8 = BoxedUnit.UNIT;
                        }
                        System.exit(1);
                    }
                    this.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$anon$$reConnect();
                }
                this.$outer.pull(this.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$anon$$$outer().in());
            }

            public static final /* synthetic */ void $anonfun$onPush$3(EventData eventData, Map map) {
                map.keys().foreach(str -> {
                    return eventData.getProperties().put(str, map.apply(str));
                });
            }

            public static final /* synthetic */ void $anonfun$onPush$5(EventhubsBatchSink$$anon$1$$anon$2 eventhubsBatchSink$$anon$1$$anon$2, EventhubsSinkData eventhubsSinkData) {
                eventhubsSinkData.ackable().fold(() -> {
                }, ackableOffset -> {
                    ackableOffset.ack();
                    return BoxedUnit.UNIT;
                });
                eventhubsSinkData.genericAck().fold(() -> {
                }, function0 -> {
                    function0.apply$mcV$sp();
                    return BoxedUnit.UNIT;
                });
                if (!eventhubsBatchSink$$anon$1$$anon$2.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$anon$$$outer().logger().underlying().isDebugEnabled()) {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    eventhubsBatchSink$$anon$1$$anon$2.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$anon$$$outer().logger().underlying().debug("eventhubs sink partition {} successfully sent key {}, count = {}", new Object[]{BoxesRunTime.boxToInteger(eventhubsBatchSink$$anon$1$$anon$2.$outer.onextent$akka$eventhubs$EventhubsBatchSink$$anon$$$outer().onextent$akka$eventhubs$EventhubsBatchSink$$partitionId), eventhubsSinkData.keyOpt(), BoxesRunTime.boxToLong(eventhubsBatchSink$$anon$1$$anon$2.count())});
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.count = 0L;
            }
        });
    }
}
