package fs2.aws.dynamodb;

import cats.Applicative;
import cats.effect.kernel.Async;
import cats.effect.kernel.GenConcurrent;
import cats.effect.std.Dispatcher;
import cats.effect.std.Dispatcher$;
import cats.effect.std.Queue;
import cats.effect.std.Queue$;
import cats.implicits$;
import cats.syntax.ApplicativeIdOps$;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClientBuilder;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.model.Record;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$NestedStreamOps$;
import fs2.compat.NotGiven$;
import fs2.concurrent.SignallingRef;
import fs2.concurrent.SignallingRef$;
import java.util.UUID;
import scala.DummyImplicit$;
import scala.Function1;
import scala.MatchError;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

/* compiled from: package.scala */
/* loaded from: input_file:fs2/aws/dynamodb/package$.class */
public final class package$ {
    public static final package$ MODULE$ = new package$();

    private <F> Function1<IRecordProcessorFactory, F> defaultWorker(KinesisClientLibConfiguration kinesisClientLibConfiguration, AmazonDynamoDBStreams amazonDynamoDBStreams, AmazonDynamoDB amazonDynamoDB, AmazonCloudWatch amazonCloudWatch, Applicative<F> applicative) {
        return iRecordProcessorFactory -> {
            return ApplicativeIdOps$.MODULE$.pure$extension(implicits$.MODULE$.catsSyntaxApplicativeId(StreamsWorkerFactory.createDynamoDbStreamsWorker(iRecordProcessorFactory, kinesisClientLibConfiguration, new AmazonDynamoDBStreamsAdapterClient(amazonDynamoDBStreams), amazonDynamoDB, amazonCloudWatch)), applicative);
        };
    }

    public <F> Stream<F, CommittableRecord> readFromDynamDBStream(String str, String str2, Async<F> async) {
        return readFromDynamoDBStream(new KinesisClientLibConfiguration(str, str2, DefaultAWSCredentialsProviderChain.getInstance(), new StringBuilder(1).append(scala.sys.process.package$.MODULE$.stringToProcess("hostname").$bang$bang().trim()).append(":").append(UUID.randomUUID()).toString()).withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON), readFromDynamoDBStream$default$2(), readFromDynamoDBStream$default$3(), readFromDynamoDBStream$default$4(), readFromDynamoDBStream$default$5(), async, async);
    }

    public <F> Stream<F, CommittableRecord> readFromDynamoDBStream(KinesisClientLibConfiguration kinesisClientLibConfiguration, AmazonDynamoDBStreams amazonDynamoDBStreams, AmazonDynamoDB amazonDynamoDB, AmazonCloudWatch amazonCloudWatch, KinesisStreamSettings kinesisStreamSettings, Async<F> async, GenConcurrent<F, Throwable> genConcurrent) {
        return readFromDynamoDBStream(defaultWorker(kinesisClientLibConfiguration, amazonDynamoDBStreams, amazonDynamoDB, amazonCloudWatch, async), kinesisStreamSettings, async, async);
    }

    public <F> Stream<F, CommittableRecord> readFromDynamoDBStream(Function1<IRecordProcessorFactory, F> function1, KinesisStreamSettings kinesisStreamSettings, Async<F> async, GenConcurrent<F, Throwable> genConcurrent) {
        return Stream$.MODULE$.resource(Dispatcher$.MODULE$.apply(async), async).flatMap(dispatcher -> {
            return Stream$.MODULE$.eval(Queue$.MODULE$.bounded(kinesisStreamSettings.bufferSize(), async)).flatMap(queue -> {
                return Stream$.MODULE$.eval(SignallingRef$.MODULE$.apply(BoxesRunTime.boxToBoolean(false), async)).flatMap(signallingRef -> {
                    return instantiateWorker$1(dispatcher, queue, signallingRef, function1, async).flatMap(worker -> {
                        return Stream$.MODULE$.fromQueueUnterminated(queue, Stream$.MODULE$.fromQueueUnterminated$default$2(), async).interruptWhen(signallingRef, async);
                    }, NotGiven$.MODULE$.default());
                }, NotGiven$.MODULE$.default());
            }, NotGiven$.MODULE$.default());
        }, NotGiven$.MODULE$.default());
    }

    public <F> AmazonDynamoDBStreams readFromDynamoDBStream$default$2() {
        return (AmazonDynamoDBStreams) AmazonDynamoDBStreamsClientBuilder.standard().withRegion(Regions.US_EAST_1).build();
    }

    public <F> AmazonDynamoDB readFromDynamoDBStream$default$3() {
        return (AmazonDynamoDB) AmazonDynamoDBClientBuilder.standard().withRegion(Regions.US_EAST_1).build();
    }

    public <F> AmazonCloudWatch readFromDynamoDBStream$default$4() {
        return (AmazonCloudWatch) AmazonCloudWatchClientBuilder.standard().withRegion(Regions.US_EAST_1).build();
    }

    public <F> KinesisStreamSettings readFromDynamoDBStream$default$5() {
        return KinesisStreamSettings$.MODULE$.defaultInstance();
    }

    public <F> Function1<Stream<F, CommittableRecord>, Stream<F, Record>> checkpointRecords(KinesisCheckpointSettings kinesisCheckpointSettings, int i, Async<F> async, GenConcurrent<F, Throwable> genConcurrent) {
        return stream -> {
            return Stream$NestedStreamOps$.MODULE$.parJoinUnbounded$extension(Stream$.MODULE$.NestedStreamOps(stream.through(fs2.aws.core.package$.MODULE$.groupBy(committableRecord -> {
                return cats.effect.package$.MODULE$.Sync().apply(async).delay(() -> {
                    return committableRecord.shardId();
                });
            }, async)).map(tuple2 -> {
                if (tuple2 != null) {
                    return ((Stream) tuple2._2()).broadcastThrough(ScalaRunTime$.MODULE$.wrapRefArray(new Function1[]{stream -> {
                        return stream.groupWithin(kinesisCheckpointSettings.maxBatchSize(), kinesisCheckpointSettings.maxBatchWait(), async).collect(new package$$anonfun$$nestedInanonfun$checkpointRecords$1$1()).flatMap(tuple2 -> {
                            if (tuple2 == null) {
                                throw new MatchError(tuple2);
                            }
                            int _1$mcI$sp = tuple2._1$mcI$sp();
                            CommittableRecord committableRecord2 = (CommittableRecord) tuple2._2();
                            return Stream$.MODULE$.eval(implicits$.MODULE$.toFunctorOps(committableRecord2.checkpoint(_1$mcI$sp, async), async).as(committableRecord2.record())).drain();
                        }, NotGiven$.MODULE$.default());
                    }, bypass$1()}), async);
                }
                throw new MatchError(tuple2);
            })), async);
        };
    }

    public <F> KinesisCheckpointSettings checkpointRecords$default$1() {
        return KinesisCheckpointSettings$.MODULE$.defaultInstance();
    }

    public <F> int checkpointRecords$default$2() {
        return 10;
    }

    public <F> Function1<Stream<F, CommittableRecord>, Stream<F, BoxedUnit>> checkpointRecords_(KinesisCheckpointSettings kinesisCheckpointSettings, Async<F> async, GenConcurrent<F, Throwable> genConcurrent) {
        return stream -> {
            return stream.through(MODULE$.checkpointRecords(kinesisCheckpointSettings, MODULE$.checkpointRecords$default$2(), async, async)).map(record -> {
                $anonfun$checkpointRecords_$2(record);
                return BoxedUnit.UNIT;
            });
        };
    }

    public <F> KinesisCheckpointSettings checkpointRecords_$default$1() {
        return KinesisCheckpointSettings$.MODULE$.defaultInstance();
    }

    public static final /* synthetic */ void $anonfun$readFromDynamoDBStream$2(Dispatcher dispatcher, Queue queue, CommittableRecord committableRecord) {
        dispatcher.unsafeRunSync(queue.offer(committableRecord));
    }

    private static final Stream instantiateWorker$1(Dispatcher dispatcher, Queue queue, SignallingRef signallingRef, Function1 function1, Async async) {
        return Stream$.MODULE$.bracket(implicits$.MODULE$.toFlatMapOps(function1.apply(() -> {
            return new RecordProcessor(committableRecord -> {
                $anonfun$readFromDynamoDBStream$2(dispatcher, queue, committableRecord);
                return BoxedUnit.UNIT;
            });
        }), async).flatTap(worker -> {
            return cats.effect.package$.MODULE$.Concurrent().apply(async, DummyImplicit$.MODULE$.dummyImplicit()).start(implicits$.MODULE$.toFlatMapOps(cats.effect.package$.MODULE$.Async().apply(async).blocking(() -> {
                worker.run();
            }), async).flatTap(boxedUnit -> {
                return signallingRef.set(BoxesRunTime.boxToBoolean(true));
            }));
        }), worker2 -> {
            return cats.effect.package$.MODULE$.Async().apply(async).blocking(() -> {
                worker2.shutdown();
            });
        });
    }

    private static final Function1 bypass$1() {
        return stream -> {
            return stream.map(committableRecord -> {
                return committableRecord.record();
            });
        };
    }

    public static final /* synthetic */ void $anonfun$checkpointRecords_$2(Record record) {
    }

    private package$() {
    }
}
