package net.sigusr.mqtt.impl.protocol;

import cats.effect.kernel.GenConcurrent;
import cats.effect.kernel.GenTemporal;
import cats.effect.kernel.syntax.GenSpawnOps$;
import cats.effect.kernel.syntax.GenSpawnOps_$;
import cats.effect.std.Console;
import cats.effect.std.Console$;
import cats.implicits$;
import cats.syntax.FlatMapOps$;
import com.comcast.ip4s.SocketAddress;
import fs2.Compiler$;
import fs2.Compiler$Target$;
import fs2.RaiseThrowable$;
import fs2.Stream;
import fs2.Stream$;
import fs2.compat.NotGiven$;
import fs2.io.net.Network;
import fs2.io.net.Network$;
import fs2.io.net.Socket;
import fs2.io.net.tls.TLSContext;
import net.sigusr.impl.protocol.Direction;
import net.sigusr.mqtt.api.ConnectionFailureReason;
import net.sigusr.mqtt.api.ConnectionState;
import net.sigusr.mqtt.api.ConnectionState$Connected$;
import net.sigusr.mqtt.api.ConnectionState$Disconnected$;
import net.sigusr.mqtt.api.Errors;
import net.sigusr.mqtt.api.RetryConfig$;
import net.sigusr.mqtt.api.TransportConfig;
import net.sigusr.mqtt.impl.frames.Frame;
import net.sigusr.mqtt.impl.frames.Frame$;
import retry.RetryDetails;
import retry.Sleep$;
import scala.DummyImplicit$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scodec.Codec;
import scodec.Codec$;
import scodec.stream.StreamDecoder$;
import scodec.stream.StreamEncoder$;
import shapeless.Lazy$;

/* compiled from: Transport.scala */
/* loaded from: input_file:net/sigusr/mqtt/impl/protocol/Transport$.class */
public final class Transport$ {
    public static final Transport$ MODULE$ = new Transport$();

    private <F> Option<Function1<Function0<String>, F>> traceTLS(boolean z, Console<F> console) {
        return z ? new Some(function0 -> {
            return Console$.MODULE$.apply(console).println(new StringBuilder(15).append("\u001b[35m").append("[TLS] ").append(function0.apply()).append("\u001b[0m").toString(), implicits$.MODULE$.catsStdShowForString());
        }) : None$.MODULE$;
    }

    private <F> Function1<Stream<F, Frame>, Stream<F, Frame>> tracingPipe(Direction direction, GenConcurrent<F, Throwable> genConcurrent, Console<F> console) {
        return stream -> {
            return stream.flatMap(frame -> {
                return (direction.active() ? Stream$.MODULE$.eval(Console$.MODULE$.apply(console).println(new StringBuilder(6).append(" ").append(direction.value()).append(" ").append(direction.color()).append(frame).append("\u001b[0m").toString(), implicits$.MODULE$.catsStdShowForString())) : Stream$.MODULE$.eval(cats.effect.package$.MODULE$.Concurrent().apply(genConcurrent, DummyImplicit$.MODULE$.dummyImplicit()).unit())).map(boxedUnit -> {
                    return frame;
                });
            }, NotGiven$.MODULE$.default());
        };
    }

    private <F> F connect(TransportConfig<F> transportConfig, TransportConnector<F> transportConnector, GenTemporal<F, Throwable> genTemporal, Network<F> network, Console<F> console) {
        return (F) GenSpawnOps$.MODULE$.start$extension(cats.effect.implicits$.MODULE$.genSpawnOps(loop$1(transportConfig, genTemporal, transportConnector, network, console), genTemporal), genTemporal);
    }

    public <F> F apply(TransportConfig<F> transportConfig, TransportConnector<F> transportConnector, GenTemporal<F, Throwable> genTemporal, Network<F> network, Console<F> console) {
        return (F) implicits$.MODULE$.toFunctorOps(connect(transportConfig, transportConnector, genTemporal, network, console), genTemporal).map(fiber -> {
            return new Transport<F>() { // from class: net.sigusr.mqtt.impl.protocol.Transport$$anon$1
            };
        });
    }

    private final Object outgoing$1(Socket socket, TransportConnector transportConnector, TransportConfig transportConfig, GenTemporal genTemporal, Console console) {
        Stream through = transportConnector.out().through(tracingPipe(new Direction.Out(transportConfig.traceMessages()), genTemporal, console));
        StreamEncoder$ streamEncoder$ = StreamEncoder$.MODULE$;
        Codec$ codec$ = Codec$.MODULE$;
        Codec<Frame> codec = Frame$.MODULE$.codec();
        return through.through(streamEncoder$.many(codec$.apply(Lazy$.MODULE$.apply(() -> {
            return codec;
        })).asEncoder()).toPipeByte(RaiseThrowable$.MODULE$.fromApplicativeError(genTemporal))).through(socket.writes()).onComplete(() -> {
            return Stream$.MODULE$.eval(transportConnector.stateSignal().set(ConnectionState$Disconnected$.MODULE$));
        }).compile(Compiler$.MODULE$.target(Compiler$Target$.MODULE$.forConcurrent(genTemporal))).drain();
    }

    private final Object incoming$1(Socket socket, GenTemporal genTemporal, TransportConfig transportConfig, Console console, TransportConnector transportConnector) {
        Stream reads = socket.reads();
        StreamDecoder$ streamDecoder$ = StreamDecoder$.MODULE$;
        Codec$ codec$ = Codec$.MODULE$;
        Codec<Frame> codec = Frame$.MODULE$.codec();
        return reads.through(streamDecoder$.many(codec$.apply(Lazy$.MODULE$.apply(() -> {
            return codec;
        })).asDecoder()).toPipeByte(RaiseThrowable$.MODULE$.fromApplicativeError(genTemporal))).through(tracingPipe(new Direction.In(transportConfig.traceMessages()), genTemporal, console)).through(transportConnector.in()).onComplete(() -> {
            return Stream$.MODULE$.eval(transportConnector.stateSignal().set(ConnectionState$Disconnected$.MODULE$));
        }).compile(Compiler$.MODULE$.target(Compiler$Target$.MODULE$.forConcurrent(genTemporal))).drain();
    }

    public static final /* synthetic */ Object $anonfun$connect$5(Socket socket, GenTemporal genTemporal, boolean z) {
        return z ? socket.endOfOutput() : cats.effect.package$.MODULE$.Concurrent().apply(genTemporal, DummyImplicit$.MODULE$.dummyImplicit()).pure(BoxedUnit.UNIT);
    }

    private static final Object closeSignalWatcher$1(Socket socket, TransportConnector transportConnector, GenTemporal genTemporal) {
        return transportConnector.closeSignal().discrete().evalMap(obj -> {
            return $anonfun$connect$5(socket, genTemporal, BoxesRunTime.unboxToBoolean(obj));
        }).compile(Compiler$.MODULE$.target(Compiler$Target$.MODULE$.forConcurrent(genTemporal))).drain();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final Object publishError$1(Throwable th, RetryDetails retryDetails, TransportConnector transportConnector) {
        if (retryDetails instanceof RetryDetails.WillDelayAndRetry) {
            RetryDetails.WillDelayAndRetry willDelayAndRetry = (RetryDetails.WillDelayAndRetry) retryDetails;
            FiniteDuration nextDelay = willDelayAndRetry.nextDelay();
            int retriesSoFar = willDelayAndRetry.retriesSoFar();
            if (nextDelay != null && 1 != 0) {
                return transportConnector.stateSignal().set(new ConnectionState.Connecting(nextDelay, retriesSoFar));
            }
        }
        if (retryDetails instanceof RetryDetails.GivingUp) {
            return transportConnector.stateSignal().set(new ConnectionState.Error(new Errors.ConnectionFailure(new ConnectionFailureReason.TransportError(th))));
        }
        throw new MatchError(retryDetails);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Object pump$1(Socket socket, TransportConnector transportConnector, GenTemporal genTemporal, TransportConfig transportConfig, Console console) {
        return implicits$.MODULE$.toFlatMapOps(transportConnector.stateSignal().set(ConnectionState$Connected$.MODULE$), genTemporal).flatMap(boxedUnit -> {
            return implicits$.MODULE$.toFunctorOps(GenSpawnOps_$.MODULE$.race$extension(cats.effect.implicits$.MODULE$.genSpawnOps_(GenSpawnOps_$.MODULE$.race$extension(cats.effect.implicits$.MODULE$.genSpawnOps_(this.outgoing$1(socket, transportConnector, transportConfig, genTemporal, console)), this.incoming$1(socket, genTemporal, transportConfig, console, transportConnector), genTemporal)), closeSignalWatcher$1(socket, transportConnector, genTemporal), genTemporal), genTemporal).map(either -> {
                BoxedUnit.UNIT;
                return BoxedUnit.UNIT;
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Object loop$1(TransportConfig transportConfig, GenTemporal genTemporal, TransportConnector transportConnector, Network network, Console console) {
        return FlatMapOps$.MODULE$.$greater$greater$extension(implicits$.MODULE$.catsSyntaxFlatMapOps(retry.package$.MODULE$.retryingOnAllErrors().apply(RetryConfig$.MODULE$.policyOf(transportConfig.retryConfig(), genTemporal), (th, retryDetails) -> {
            return publishError$1(th, retryDetails, transportConnector);
        }, () -> {
            Network apply = Network$.MODULE$.apply(network);
            return apply.client(new SocketAddress(transportConfig.host(), transportConfig.port()), apply.client$default$2()).use(socket -> {
                return transportConfig.tlsConfig().fold(() -> {
                    return this.pump$1(socket, transportConnector, genTemporal, transportConfig, console);
                }, tLSConfig -> {
                    return implicits$.MODULE$.toFlatMapOps(tLSConfig.contextOf(), genTemporal).flatMap(tLSContext -> {
                        TLSContext.SocketBuilder withParameters = tLSContext.clientBuilder(socket).withParameters(tLSConfig.tlsParameters());
                        return ((TLSContext.SocketBuilder) MODULE$.traceTLS(transportConfig.traceMessages(), console).fold(() -> {
                            return withParameters;
                        }, function1 -> {
                            return withParameters.withLogging(function1);
                        })).build().use(socket -> {
                            return this.pump$1(socket, transportConnector, genTemporal, transportConfig, console);
                        }, genTemporal);
                    });
                });
            }, genTemporal);
        }, genTemporal, Sleep$.MODULE$.sleepUsingTemporal(genTemporal)), genTemporal), () -> {
            return implicits$.MODULE$.toFlatMapOps(transportConnector.stateSignal().get(), genTemporal).flatMap(connectionState -> {
                return ConnectionState$Disconnected$.MODULE$.equals(connectionState) ? FlatMapOps$.MODULE$.$greater$greater$extension(implicits$.MODULE$.catsSyntaxFlatMapOps(transportConnector.closeSignal().set(BoxesRunTime.boxToBoolean(false)), genTemporal), () -> {
                    return this.loop$1(transportConfig, genTemporal, transportConnector, network, console);
                }, genTemporal) : cats.effect.package$.MODULE$.Concurrent().apply(genTemporal, DummyImplicit$.MODULE$.dummyImplicit()).unit();
            });
        }, genTemporal);
    }

    private Transport$() {
    }
}
