package com.itv.bucky.ext.fs2;

import cats.data.EitherT;
import cats.effect.IO;
import cats.effect.IO$;
import cats.effect.Sync;
import cats.implicits$;
import cats.syntax.EitherIdOps$;
import com.itv.bucky.AmqpClient;
import com.itv.bucky.AmqpOps;
import com.itv.bucky.PublishCommandBuilder;
import com.itv.bucky.decl.package;
import com.itv.bucky.ext.fs2.Cpackage;
import com.itv.bucky.package;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import fs2.Scheduler;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$InvariantOps$;
import fs2.Stream$ToEffect$;
import fs2.async.Promise;
import fs2.async.Ref;
import fs2.async.mutable.Signal;
import fs2.internal.FreeC;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Either;
import scala.util.Try;
import scala.util.Try$;

/* compiled from: io.scala */
/* loaded from: input_file:com/itv/bucky/ext/fs2/io$.class */
public final class io$ implements StrictLogging {
    public static io$ MODULE$;
    private final Logger logger;

    static {
        new io$();
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    public IO<Cpackage.MemoryAmqpSimulator<IO>> apply(Cpackage.MemoryAmqpSimulator.Config config, ExecutionContext executionContext, Scheduler scheduler, package.Monad<?> monad, package.MonadError<IO, Throwable> monadError, Sync<IO> sync) {
        return ((IO) fs2.async.package$.MODULE$.refOf(List$.MODULE$.empty(), sync)).flatMap(ref -> {
            return ((IO) fs2.async.package$.MODULE$.refOf(List$.MODULE$.empty(), sync)).flatMap(ref -> {
                return ((IO) fs2.async.package$.MODULE$.refOf(Predef$.MODULE$.Map().empty(), sync)).flatMap(ref -> {
                    return ((IO) fs2.async.package$.MODULE$.signalOf(BoxesRunTime.boxToInteger(0), IO$.MODULE$.ioConcurrentEffect(), executionContext)).map(signal -> {
                        return new Cpackage.MemoryAmqpSimulator<IO>(config, executionContext, scheduler, monad, monadError, sync, ref, ref, ref, signal) { // from class: com.itv.bucky.ext.fs2.io$$anon$1
                            private final Cpackage.MemoryAmqpSimulator.Config config$1;
                            private final ExecutionContext executionContext$1;
                            private final Scheduler scheduler$1;
                            private final package.Monad idMonad$1;
                            private final package.MonadError ioMonadError$1;
                            private final Sync F$1;
                            private final Ref sourceMessages$1;
                            private final Ref bindings$1;
                            private final Ref consumers$1;
                            private final Signal deliveryTagInc$1;

                            public <T> Function1<T, IO<BoxedUnit>> publisherOf(PublishCommandBuilder<T> publishCommandBuilder, Duration duration) {
                                return (Function1<T, IO<BoxedUnit>>) AmqpClient.publisherOf$(this, publishCommandBuilder, duration);
                            }

                            public <T> Duration publisherOf$default$2() {
                                return AmqpClient.publisherOf$default$2$(this);
                            }

                            public Duration publisher$default$1() {
                                return AmqpClient.publisher$default$1$(this);
                            }

                            public package.ConsumeAction consumer$default$3() {
                                return AmqpClient.consumer$default$3$(this);
                            }

                            public int consumer$default$4() {
                                return AmqpClient.consumer$default$4$(this);
                            }

                            public package.MonadError<IO, Throwable> effectMonad() {
                                return this.ioMonadError$1;
                            }

                            public package.Monad<Object> monad() {
                                return this.idMonad$1;
                            }

                            @Override // com.itv.bucky.ext.fs2.Cpackage.MemoryAmqpSimulator
                            /* renamed from: publish, reason: merged with bridge method [inline-methods] */
                            public IO publish2(package.PublishCommand publishCommand) {
                                return ((IO) fs2.async.package$.MODULE$.promise(IO$.MODULE$.ioConcurrentEffect(), this.executionContext$1)).map(promise -> {
                                    return new Tuple2(promise, new Cpackage.Message.Source(publishCommand, promise));
                                }).flatMap(tuple2 -> {
                                    if (tuple2 == null) {
                                        throw new MatchError(tuple2);
                                    }
                                    Promise promise2 = (Promise) tuple2._1();
                                    Cpackage.Message.Source source = (Cpackage.Message.Source) tuple2._2();
                                    return ((IO) this.sourceMessages$1.modify(list -> {
                                        return (List) list.$plus$colon(source, List$.MODULE$.canBuildFrom());
                                    })).flatMap(change -> {
                                        return IO$.MODULE$.apply(() -> {
                                            if (!io$.MODULE$.logger().underlying().isInfoEnabled()) {
                                                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                            } else {
                                                io$.MODULE$.logger().underlying().info("Message published: {}", new Object[]{implicits$.MODULE$.toShow(publishCommand, package$Message$.MODULE$.publishCommandShowInstance()).show()});
                                                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                                            }
                                        }).flatMap(boxedUnit -> {
                                            return this.processNext(source).map(boxedUnit -> {
                                                return promise2;
                                            });
                                        });
                                    });
                                });
                            }

                            @Override // com.itv.bucky.ext.fs2.Cpackage.MemoryAmqpSimulator
                            /* renamed from: publishAndWait, reason: merged with bridge method [inline-methods] */
                            public IO publishAndWait2(package.PublishCommand publishCommand, FiniteDuration finiteDuration) {
                                return publish2(publishCommand).flatMap(promise -> {
                                    return this.completePromiseOrTimeout(promise, publishCommand, finiteDuration);
                                });
                            }

                            @Override // com.itv.bucky.ext.fs2.Cpackage.MemoryAmqpSimulator
                            /* renamed from: waitForMessagesToBeProcessed, reason: merged with bridge method [inline-methods] */
                            public IO waitForMessagesToBeProcessed2(FiniteDuration finiteDuration) {
                                return ((IO) this.sourceMessages$1.get()).flatMap(list -> {
                                    return (IO) implicits$.MODULE$.toTraverseOps(list, implicits$.MODULE$.catsStdInstancesForList()).traverse(source -> {
                                        return this.completePromiseOrTimeout(source.promise(), source.publishCommand(), finiteDuration);
                                    }, this.F$1);
                                });
                            }

                            /* renamed from: publisher, reason: merged with bridge method [inline-methods] */
                            public Function1<package.PublishCommand, IO<BoxedUnit>> m22publisher(Duration duration) {
                                return publishCommand -> {
                                    return (IO) implicits$.MODULE$.toFunctorOps(this.publish2(publishCommand), this.F$1).void();
                                };
                            }

                            public FreeC<?, BoxedUnit> consumer(package.QueueName queueName, Function1<package.Delivery, IO<package.ConsumeAction>> function1, package.ConsumeAction consumeAction, int i) {
                                return Stream$InvariantOps$.MODULE$.observe1$extension(Stream$.MODULE$.InvariantOps(Stream$.MODULE$.eval(implicits$.MODULE$.toFunctorOps(this.consumers$1.modify(map -> {
                                    return map.$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(queueName), function1));
                                }), this.F$1).void())), boxedUnit -> {
                                    return IO$.MODULE$.apply(() -> {
                                        if (!io$.MODULE$.logger().underlying().isInfoEnabled()) {
                                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                        } else {
                                            io$.MODULE$.logger().underlying().info("Consumer ready in {}", new Object[]{queueName});
                                            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                                        }
                                    });
                                }, this.F$1);
                            }

                            public Try<BoxedUnit> performOps(Function1<AmqpOps, Try<BoxedUnit>> function1) {
                                return (Try) function1.apply(package$.MODULE$.amqpOpsFor(binding -> {
                                    return Try$.MODULE$.apply(() -> {
                                        this.addBinding(binding).unsafeRunSync();
                                    });
                                }));
                            }

                            public Try<Object> estimatedMessageCount(package.QueueName queueName) {
                                return Try$.MODULE$.apply(() -> {
                                    return 0;
                                });
                            }

                            private IO<Ref.Change<List<package.Binding>>> addBinding(package.Binding binding) {
                                return (IO) this.bindings$1.modify(list -> {
                                    return (List) list.$plus$colon(binding, List$.MODULE$.canBuildFrom());
                                });
                            }

                            private Function1<List<package.Binding>, Option<package.Binding>> bindingFor(Cpackage.Message message) {
                                return list -> {
                                    return list.find(binding -> {
                                        return BoxesRunTime.boxToBoolean($anonfun$bindingFor$2(message, binding));
                                    });
                                };
                            }

                            private IO<BoxedUnit> processNext(Cpackage.Message message) {
                                return handlerFor(message).flatMap(either -> {
                                    return ((IO) either.fold(retry -> {
                                        return this.retryOrFail(retry);
                                    }, function1 -> {
                                        return this.handle(message, function1);
                                    })).map(boxedUnit -> {
                                        $anonfun$processNext$4(boxedUnit);
                                        return BoxedUnit.UNIT;
                                    });
                                });
                            }

                            private IO<Either<Cpackage.Message.Retry, Function1<package.Delivery, IO<package.ConsumeAction>>>> handlerFor(Cpackage.Message message) {
                                return (IO) new EitherT(((IO) this.bindings$1.get()).map(bindingFor(message)).map(option -> {
                                    return (Either) option.fold(() -> {
                                        return EitherIdOps$.MODULE$.asLeft$extension(implicits$.MODULE$.catsSyntaxEitherId(package$Message$.MODULE$.noBindingFound(message)));
                                    }, binding -> {
                                        return EitherIdOps$.MODULE$.asRight$extension(implicits$.MODULE$.catsSyntaxEitherId(binding));
                                    });
                                })).flatMap(binding -> {
                                    return new EitherT(((IO) this.consumers$1.get()).map(map -> {
                                        return (Either) map.get(binding.queueName()).fold(() -> {
                                            return EitherIdOps$.MODULE$.asLeft$extension(implicits$.MODULE$.catsSyntaxEitherId(package$Message$.MODULE$.noConsumerFound(message)));
                                        }, function1 -> {
                                            return EitherIdOps$.MODULE$.asRight$extension(implicits$.MODULE$.catsSyntaxEitherId(function1));
                                        });
                                    })).map(function1 -> {
                                        return function1;
                                    }, this.F$1);
                                }, this.F$1).value();
                            }

                            /* JADX INFO: Access modifiers changed from: private */
                            public IO<BoxedUnit> retryOrFail(Cpackage.Message.Retry retry) {
                                Cpackage.ConsumeActionResult notConsumerFound;
                                if (retry.retries() <= this.config$1.retryPolicy().total()) {
                                    return (IO) implicits$.MODULE$.toFunctorOps(Stream$ToEffect$.MODULE$.last$extension(Stream$InvariantOps$.MODULE$.compile$extension(Stream$.MODULE$.InvariantOps(Stream$InvariantOps$.MODULE$.$plus$plus$extension(Stream$.MODULE$.InvariantOps(this.scheduler$1.sleep_(this.config$1.retryPolicy().sleep(), IO$.MODULE$.ioConcurrentEffect(), this.executionContext$1)), () -> {
                                        return new Stream($anonfun$retryOrFail$1(this, retry));
                                    }))), this.F$1), this.F$1).void();
                                }
                                Cpackage.Message.Issue issue = retry.issue();
                                if (package$Message$Issue$NoBindingFound$.MODULE$.equals(issue)) {
                                    notConsumerFound = package$.MODULE$.PublishCommandConsumeActionExt(retry.source().publishCommand()).notBindingFound(this.config$1.retryPolicy());
                                } else {
                                    if (!package$Message$Issue$NoConsumerFound$.MODULE$.equals(issue)) {
                                        throw new MatchError(issue);
                                    }
                                    notConsumerFound = package$.MODULE$.PublishCommandConsumeActionExt(retry.source().publishCommand()).notConsumerFound(this.config$1.retryPolicy());
                                }
                                return completePromise(retry, notConsumerFound);
                            }

                            /* JADX INFO: Access modifiers changed from: private */
                            public IO<BoxedUnit> handle(Cpackage.Message message, Function1<package.Delivery, IO<package.ConsumeAction>> function1) {
                                return ((IO) this.deliveryTagInc$1.modify(i -> {
                                    return i + 1;
                                })).flatMap(change -> {
                                    return ((IO) fs2.async.package$.MODULE$.fork(((IO) function1.apply(this.deliveryFor(change, package$Message$.MODULE$.sourceFrom(message)))).flatMap(consumeAction -> {
                                        return this.completePromise(message, package$.MODULE$.ConsumeActionExt(consumeAction).result());
                                    }), IO$.MODULE$.ioConcurrentEffect(), this.executionContext$1)).map(boxedUnit -> {
                                        $anonfun$handle$4(boxedUnit);
                                        return BoxedUnit.UNIT;
                                    });
                                });
                            }

                            private package.Delivery deliveryFor(Ref.Change<Object> change, Cpackage.Message.Source source) {
                                return new package.Delivery(source.publishCommand().body(), new package.ConsumerTag("ctag"), new package.Envelope(BoxesRunTime.unboxToInt(change.now()), false, source.publishCommand().exchange(), source.publishCommand().routingKey()), source.publishCommand().basicProperties());
                            }

                            /* JADX INFO: Access modifiers changed from: private */
                            public IO<Cpackage.ConsumeActionResult> completePromiseOrTimeout(Promise<IO, Cpackage.ConsumeActionResult> promise, package.PublishCommand publishCommand, FiniteDuration finiteDuration) {
                                return ((IO) promise.timedGet(finiteDuration, this.scheduler$1)).flatMap(option -> {
                                    return (IO) option.fold(() -> {
                                        return this.timeoutFor(publishCommand, finiteDuration);
                                    }, consumeActionResult -> {
                                        return IO$.MODULE$.pure(consumeActionResult);
                                    });
                                });
                            }

                            /* JADX INFO: Access modifiers changed from: private */
                            public IO<BoxedUnit> completePromise(Cpackage.Message message, Cpackage.ConsumeActionResult consumeActionResult) {
                                return ((IO) package$Message$.MODULE$.sourceFrom(message).promise().complete(consumeActionResult)).flatMap(boxedUnit -> {
                                    return this.showResult(package$Message$.MODULE$.sourceFrom(message).publishCommand(), consumeActionResult).map(boxedUnit -> {
                                        $anonfun$completePromise$2(boxedUnit);
                                        return BoxedUnit.UNIT;
                                    });
                                });
                            }

                            /* JADX INFO: Access modifiers changed from: private */
                            public IO<Cpackage.ConsumeActionResult> timeoutFor(package.PublishCommand publishCommand, FiniteDuration finiteDuration) {
                                Cpackage.ConsumeActionResult timeout = package$.MODULE$.PublishCommandConsumeActionExt(publishCommand).timeout(finiteDuration);
                                return showResult(publishCommand, timeout).map(boxedUnit -> {
                                    return timeout;
                                });
                            }

                            private IO<BoxedUnit> showResult(package.PublishCommand publishCommand, Cpackage.ConsumeActionResult consumeActionResult) {
                                return IO$.MODULE$.apply(() -> {
                                    if (!io$.MODULE$.logger().underlying().isInfoEnabled()) {
                                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                    } else {
                                        io$.MODULE$.logger().underlying().info("{} for {}", new String[]{implicits$.MODULE$.toShow(consumeActionResult, package$ConsumeActionResult$.MODULE$.showInstances()).show(), implicits$.MODULE$.toShow(publishCommand, package$Message$.MODULE$.publishCommandShowInstance()).show()});
                                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                                    }
                                });
                            }

                            /* renamed from: consumer, reason: collision with other method in class */
                            public /* bridge */ /* synthetic */ Object m21consumer(package.QueueName queueName, Function1 function1, package.ConsumeAction consumeAction, int i) {
                                return new Stream(consumer(queueName, (Function1<package.Delivery, IO<package.ConsumeAction>>) function1, consumeAction, i));
                            }

                            public static final /* synthetic */ boolean $anonfun$bindingFor$2(Cpackage.Message message, package.Binding binding) {
                                package.PublishCommand publishCommand = package$Message$.MODULE$.sourceFrom(message).publishCommand();
                                package.RoutingKey routingKey = publishCommand.routingKey();
                                package.RoutingKey routingKey2 = binding.routingKey();
                                if (routingKey != null ? routingKey.equals(routingKey2) : routingKey2 == null) {
                                    package.ExchangeName exchange = publishCommand.exchange();
                                    package.ExchangeName exchangeName = binding.exchangeName();
                                    if (exchange != null ? exchange.equals(exchangeName) : exchangeName == null) {
                                        return true;
                                    }
                                }
                                return false;
                            }

                            public static final /* synthetic */ void $anonfun$processNext$4(BoxedUnit boxedUnit) {
                            }

                            public static final /* synthetic */ FreeC $anonfun$retryOrFail$1(io$$anon$1 io__anon_1, Cpackage.Message.Retry retry) {
                                return Stream$.MODULE$.eval(io__anon_1.processNext(retry));
                            }

                            public static final /* synthetic */ void $anonfun$handle$4(BoxedUnit boxedUnit) {
                            }

                            public static final /* synthetic */ void $anonfun$completePromise$2(BoxedUnit boxedUnit) {
                            }

                            {
                                this.config$1 = config;
                                this.executionContext$1 = executionContext;
                                this.scheduler$1 = scheduler;
                                this.idMonad$1 = monad;
                                this.ioMonadError$1 = monadError;
                                this.F$1 = sync;
                                this.sourceMessages$1 = ref;
                                this.bindings$1 = ref;
                                this.consumers$1 = ref;
                                this.deliveryTagInc$1 = signal;
                                AmqpClient.$init$(this);
                            }
                        };
                    });
                });
            });
        });
    }

    private io$() {
        MODULE$ = this;
        StrictLogging.$init$(this);
    }
}
