package px.kinesis.stream.consumer;

import akka.Done;
import akka.Done$;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.event.LogSource$;
import akka.event.Logging$;
import akka.event.LoggingAdapter;
import akka.stream.KillSwitch;
import akka.stream.Materializer;
import akka.stream.scaladsl.Sink;
import px.kinesis.stream.consumer.checkpoint.CheckpointTracker;
import px.kinesis.stream.consumer.checkpoint.CheckpointTracker$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.coordinator.CoordinatorConfig;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.retrieval.RetrievalConfig;

/* compiled from: StreamScheduler.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005=c\u0001B\f\u0019\u0001\u0005B\u0001\u0002\u000b\u0001\u0003\u0002\u0003\u0006I!\u000b\u0005\t[\u0001\u0011\t\u0011)A\u0005]!Aa\b\u0001B\u0001B\u0003%q\b\u0003\u0005D\u0001\t\u0005\t\u0015!\u0003E\u0011!i\u0005A!A!\u0002\u0017q\u0005\u0002C)\u0001\u0005\u0003\u0005\u000b1\u0002*\t\u0011a\u0003!\u0011!Q\u0001\feCQ\u0001\u0018\u0001\u0005\u0002uCq\u0001\u001b\u0001C\u0002\u0013\r\u0011\u000e\u0003\u0004q\u0001\u0001\u0006IA\u001b\u0005\bc\u0002\u0011\r\u0011\"\u0003s\u0011\u0019I\b\u0001)A\u0005g\"9!\u0010\u0001b\u0001\n\u0013Y\bbBA\b\u0001\u0001\u0006I\u0001 \u0005\b\u0003#\u0001A\u0011AA\n\u0011\u001d\t)\u0002\u0001C\u0005\u0003/Aq!a\u0007\u0001\t\u0013\ti\u0002C\u0004\u0002&\u0001!I!a\n\b\u000f\u0005E\u0002\u0004#\u0001\u00024\u00191q\u0003\u0007E\u0001\u0003kAa\u0001\u0018\u000b\u0005\u0002\u0005]\u0002bBA\u001d)\u0011\u0005\u00111\b\u0002\u0010'R\u0014X-Y7TG\",G-\u001e7fe*\u0011\u0011DG\u0001\tG>t7/^7fe*\u00111\u0004H\u0001\u0007gR\u0014X-Y7\u000b\u0005uq\u0012aB6j]\u0016\u001c\u0018n\u001d\u0006\u0002?\u0005\u0011\u0001\u000f_\u0002\u0001'\t\u0001!\u0005\u0005\u0002$M5\tAEC\u0001&\u0003\u0015\u00198-\u00197b\u0013\t9CE\u0001\u0004B]f\u0014VMZ\u0001\u0007G>tg-[4\u0011\u0005)ZS\"\u0001\r\n\u00051B\"AD\"p]N,X.\u001a:D_:4\u0017nZ\u0001\faV\u0014G.[:i'&t7\u000e\u0005\u00030k]RT\"\u0001\u0019\u000b\u0005E\u0012\u0014\u0001C:dC2\fGm\u001d7\u000b\u0005m\u0019$\"\u0001\u001b\u0002\t\u0005\\7.Y\u0005\u0003mA\u0012AaU5oWB\u0011!\u0006O\u0005\u0003sa\u0011aAU3d_J$\u0007CA\u001e=\u001b\u0005\u0019\u0014BA\u001f4\u0005\u001dqu\u000e^+tK\u0012\f!b[5mYN;\u0018\u000e^2i!\t\u0001\u0015)D\u00013\u0013\t\u0011%G\u0001\u0006LS2d7k^5uG\"\f\u0011\u0003^3s[&t\u0017\r^5p]\u001a+H/\u001e:f!\r)\u0005JS\u0007\u0002\r*\u0011q\tJ\u0001\u000bG>t7-\u001e:sK:$\u0018BA%G\u0005\u00191U\u000f^;sKB\u00111hS\u0005\u0003\u0019N\u0012A\u0001R8oK\u0006\u0011\u0011-\u001c\t\u0003\u0001>K!\u0001\u0015\u001a\u0003\u00195\u000bG/\u001a:jC2L'0\u001a:\u0002\rML8\u000f^3n!\t\u0019f+D\u0001U\u0015\t)6'A\u0003bGR|'/\u0003\u0002X)\nY\u0011i\u0019;peNK8\u000f^3n\u0003\t)7\r\u0005\u0002F5&\u00111L\u0012\u0002\u0011\u000bb,7-\u001e;j_:\u001cuN\u001c;fqR\fa\u0001P5oSRtDC\u00010h)\u0011yF-\u001a4\u0015\t\u0001\f'm\u0019\t\u0003U\u0001AQ!\u0014\u0005A\u00049CQ!\u0015\u0005A\u0004ICQ\u0001\u0017\u0005A\u0004eCQ!\f\u0005A\u00029BQA\u0010\u0005A\u0002}BQa\u0011\u0005A\u0002\u0011CQ\u0001\u000b\u0005A\u0002%\nq\u0001\\8hO&tw-F\u0001k!\tYg.D\u0001m\u0015\ti7'A\u0003fm\u0016tG/\u0003\u0002pY\nqAj\\4hS:<\u0017\tZ1qi\u0016\u0014\u0018\u0001\u00037pO\u001eLgn\u001a\u0011\u0002\u000fQ\u0014\u0018mY6feV\t1\u000f\u0005\u0002uo6\tQO\u0003\u0002w1\u0005Q1\r[3dWB|\u0017N\u001c;\n\u0005a,(!E\"iK\u000e\\\u0007o\\5oiR\u0013\u0018mY6fe\u0006AAO]1dW\u0016\u0014\b%A\u0005tG\",G-\u001e7feV\tA\u0010E\u0002~\u0003\u0017i\u0011A \u0006\u0004\u007f\u0006\u0005\u0011aC2p_J$\u0017N\\1u_JT1!HA\u0002\u0015\u0011\t)!a\u0002\u0002\r\u0005l\u0017M_8o\u0015\t\tI!\u0001\u0005t_\u001a$x/\u0019:f\u0013\r\tiA \u0002\n'\u000eDW\rZ;mKJ\f!b]2iK\u0012,H.\u001a:!\u0003\u0015\u0019H/\u0019:u)\u0005!\u0015!I:uCJ$8k\u00195fIVdWM]!oIJ+w-[:uKJ\u001c\u0006.\u001e;e_^tGc\u0001#\u0002\u001a!)\u0001\f\u0005a\u00023\u0006A1\u000f[;uI><h\u000e\u0006\u0003\u0002 \u0005\rBc\u0001#\u0002\"!)\u0001,\u0005a\u00023\")!0\u0005a\u0001y\u0006y1M]3bi\u0016\u001c6\r[3ek2,'\u000f\u0006\u0003\u0002*\u0005=B#\u0002?\u0002,\u00055\u0002\"B\u0017\u0013\u0001\u0004q\u0003\"\u0002 \u0013\u0001\u0004y\u0004\"\u0002\u0015\u0013\u0001\u0004I\u0013aD*ue\u0016\fWnU2iK\u0012,H.\u001a:\u0011\u0005)\"2C\u0001\u000b#)\t\t\u0019$A\u0003baBd\u0017\u0010\u0006\u0003\u0002>\u00055C\u0003CA \u0003\u000f\nI%a\u0013\u0015\u000f\u0001\f\t%a\u0011\u0002F!)QJ\u0006a\u0002\u001d\")\u0011K\u0006a\u0002%\")\u0001L\u0006a\u00023\")QF\u0006a\u0001]!)aH\u0006a\u0001\u007f!)1I\u0006a\u0001\t\")\u0001F\u0006a\u0001S\u0001")
/* loaded from: input_file:px/kinesis/stream/consumer/StreamScheduler.class */
public class StreamScheduler {
    private final Future<Done> terminationFuture;
    private final Materializer am;
    private final ExecutionContext ec;
    private final LoggingAdapter logging;
    private final CheckpointTracker tracker;
    private final Scheduler px$kinesis$stream$consumer$StreamScheduler$$scheduler;

    public static StreamScheduler apply(ConsumerConfig consumerConfig, Sink<Record, NotUsed> sink, KillSwitch killSwitch, Future<Done> future, Materializer materializer, ActorSystem actorSystem, ExecutionContext executionContext) {
        return StreamScheduler$.MODULE$.apply(consumerConfig, sink, killSwitch, future, materializer, actorSystem, executionContext);
    }

    public LoggingAdapter logging() {
        return this.logging;
    }

    private CheckpointTracker tracker() {
        return this.tracker;
    }

    public Scheduler px$kinesis$stream$consumer$StreamScheduler$$scheduler() {
        return this.px$kinesis$stream$consumer$StreamScheduler$$scheduler;
    }

    public Future<Done> start() {
        return startSchedulerAndRegisterShutdown(SchedulerExecutionContext$.MODULE$.Global());
    }

    private Future<Done> startSchedulerAndRegisterShutdown(ExecutionContext executionContext) {
        executionContext.execute(px$kinesis$stream$consumer$StreamScheduler$$scheduler());
        return this.terminationFuture.recoverWith(new StreamScheduler$$anonfun$startSchedulerAndRegisterShutdown$1(this, executionContext), executionContext).flatMap(done -> {
            if (this.px$kinesis$stream$consumer$StreamScheduler$$scheduler().gracefuleShutdownStarted()) {
                return Future$.MODULE$.successful(Done$.MODULE$);
            }
            this.logging().info("Shutting down Scheduler due to stream completion");
            return this.px$kinesis$stream$consumer$StreamScheduler$$shutdown(this.px$kinesis$stream$consumer$StreamScheduler$$scheduler(), executionContext);
        }, executionContext);
    }

    public Future<Done> px$kinesis$stream$consumer$StreamScheduler$$shutdown(Scheduler scheduler, ExecutionContext executionContext) {
        Future<Done> apply = Future$.MODULE$.apply(() -> {
            return (Done$) scala.concurrent.package$.MODULE$.blocking(() -> {
                scheduler.createGracefulShutdownCallable().call();
                return Done$.MODULE$;
            });
        }, executionContext);
        apply.foreach(done$ -> {
            $anonfun$shutdown$3(this, done$);
            return BoxedUnit.UNIT;
        }, executionContext);
        return apply;
    }

    private Scheduler createScheduler(ConsumerConfig consumerConfig, Sink<Record, NotUsed> sink, KillSwitch killSwitch) {
        ConfigsBuilder configsBuilder = new ConfigsBuilder(consumerConfig.streamName(), consumerConfig.appName(), consumerConfig.kinesisClient(), consumerConfig.dynamoClient(), consumerConfig.cloudwatchClient(), consumerConfig.workerId(), new RecordProcessorFactoryImpl(sink, consumerConfig.workerId(), tracker(), killSwitch, this.am, this.ec, logging()));
        return new Scheduler(configsBuilder.checkpointConfig(), (CoordinatorConfig) consumerConfig.coordinatorConfig().getOrElse(() -> {
            return configsBuilder.coordinatorConfig();
        }), (LeaseManagementConfig) consumerConfig.leaseManagementConfig().getOrElse(() -> {
            return configsBuilder.leaseManagementConfig();
        }), configsBuilder.lifecycleConfig().taskExecutionListener(new ShardShutdownListener(tracker())), (MetricsConfig) consumerConfig.metricsConfig().getOrElse(() -> {
            return configsBuilder.metricsConfig();
        }), configsBuilder.processorConfig().callProcessRecordsEvenForEmptyRecordList(true), (RetrievalConfig) consumerConfig.retrievalConfig().getOrElse(() -> {
            return configsBuilder.retrievalConfig().initialPositionInStreamExtended(consumerConfig.initialPositionInStreamExtended());
        }));
    }

    public static final /* synthetic */ void $anonfun$shutdown$3(StreamScheduler streamScheduler, Done$ done$) {
        streamScheduler.tracker().shutdown();
    }

    public StreamScheduler(ConsumerConfig consumerConfig, Sink<Record, NotUsed> sink, KillSwitch killSwitch, Future<Done> future, Materializer materializer, ActorSystem actorSystem, ExecutionContext executionContext) {
        this.terminationFuture = future;
        this.am = materializer;
        this.ec = executionContext;
        this.logging = Logging$.MODULE$.apply(actorSystem, "Kinesis", LogSource$.MODULE$.fromString());
        this.tracker = CheckpointTracker$.MODULE$.apply(consumerConfig.workerId(), consumerConfig.checkpointConfig(), actorSystem, executionContext);
        this.px$kinesis$stream$consumer$StreamScheduler$$scheduler = createScheduler(consumerConfig, sink, killSwitch);
    }
}
