package net.sigusr.mqtt.impl.protocol;

import cats.effect.Blocker;
import cats.effect.Blocker$;
import cats.effect.Concurrent;
import cats.effect.Concurrent$;
import cats.effect.ContextShift;
import cats.effect.Timer;
import cats.implicits$;
import cats.syntax.FlatMapOps$;
import fs2.RaiseThrowable$;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$Compiler$;
import fs2.internal.FreeC;
import fs2.io.tcp.Socket;
import fs2.io.tcp.SocketGroup$;
import java.net.InetSocketAddress;
import net.sigusr.impl.protocol.Direction;
import net.sigusr.mqtt.api.ConnectionFailureReason;
import net.sigusr.mqtt.api.ConnectionState;
import net.sigusr.mqtt.api.ConnectionState$Connected$;
import net.sigusr.mqtt.api.ConnectionState$Disconnected$;
import net.sigusr.mqtt.api.Errors;
import net.sigusr.mqtt.api.RetryConfig$;
import net.sigusr.mqtt.api.TransportConfig;
import net.sigusr.mqtt.impl.frames.Frame;
import net.sigusr.mqtt.impl.frames.Frame$;
import retry.RetryDetails;
import retry.Sleep$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Either;
import scodec.Codec;
import scodec.Codec$;
import scodec.stream.StreamDecoder$;
import scodec.stream.StreamEncoder$;
import shapeless.Lazy$;

/* compiled from: Transport.scala */
/* loaded from: input_file:net/sigusr/mqtt/impl/protocol/Transport$.class */
public final class Transport$ {
    public static Transport$ MODULE$;

    static {
        new Transport$();
    }

    private <F> Option<Function1<String, F>> traceTLS(boolean z, Concurrent<F> concurrent) {
        return z ? new Some(str -> {
            return package$.MODULE$.putStrLn(new StringBuilder(15).append("\u001b[35m").append("[TLS] ").append(str).append("\u001b[0m").toString(), concurrent);
        }) : None$.MODULE$;
    }

    private <F> Function1<Stream<F, Frame>, Stream<F, Frame>> tracingPipe(Direction direction, Concurrent<F> concurrent) {
        return obj -> {
            return new Stream($anonfun$tracingPipe$1(direction, concurrent, ((Stream) obj).fs2$Stream$$free()));
        };
    }

    private <F> F connect(TransportConfig<F> transportConfig, TransportConnector<F> transportConnector, Concurrent<F> concurrent, ContextShift<F> contextShift, Timer<F> timer) {
        return (F) cats.effect.implicits.package$.MODULE$.toConcurrentOps(loop$1(transportConfig, concurrent, transportConnector, contextShift, timer), concurrent).start();
    }

    public <F> F apply(TransportConfig<F> transportConfig, TransportConnector<F> transportConnector, Concurrent<F> concurrent, ContextShift<F> contextShift, Timer<F> timer) {
        return (F) implicits$.MODULE$.toFunctorOps(connect(transportConfig, transportConnector, concurrent, contextShift, timer), concurrent).map(fiber -> {
            return new Transport<F>() { // from class: net.sigusr.mqtt.impl.protocol.Transport$$anon$1
            };
        });
    }

    public static final /* synthetic */ FreeC $anonfun$tracingPipe$2(Direction direction, Concurrent concurrent, Frame frame) {
        return Stream$.MODULE$.map$extension(direction.active() ? Stream$.MODULE$.eval(package$.MODULE$.putStrLn(new StringBuilder(6).append(" ").append(direction.value()).append(" ").append(direction.color()).append(frame).append("\u001b[0m").toString(), concurrent)) : Stream$.MODULE$.eval(Concurrent$.MODULE$.apply(concurrent).unit()), boxedUnit -> {
            return frame;
        });
    }

    public static final /* synthetic */ FreeC $anonfun$tracingPipe$1(Direction direction, Concurrent concurrent, FreeC freeC) {
        return Stream$.MODULE$.flatMap$extension(freeC, frame -> {
            return new Stream($anonfun$tracingPipe$2(direction, concurrent, frame));
        });
    }

    public static final /* synthetic */ FreeC $anonfun$connect$2(TransportConnector transportConnector) {
        return Stream$.MODULE$.eval(transportConnector.stateSignal().set(ConnectionState$Disconnected$.MODULE$));
    }

    private final Object outgoing$1(Socket socket, TransportConnector transportConnector, TransportConfig transportConfig, Concurrent concurrent) {
        Stream$ stream$ = Stream$.MODULE$;
        Stream$ stream$2 = Stream$.MODULE$;
        Stream$ stream$3 = Stream$.MODULE$;
        Stream$ stream$4 = Stream$.MODULE$;
        FreeC through$extension = Stream$.MODULE$.through$extension(transportConnector.out(), tracingPipe(new Direction.Out(transportConfig.traceMessages()), concurrent));
        StreamEncoder$ streamEncoder$ = StreamEncoder$.MODULE$;
        Codec$ codec$ = Codec$.MODULE$;
        Codec<Frame> codec = Frame$.MODULE$.codec();
        return stream$.compile$extension(stream$2.onComplete$extension(stream$3.through$extension(stream$4.through$extension(through$extension, streamEncoder$.many(codec$.apply(Lazy$.MODULE$.apply(() -> {
            return codec;
        })).asEncoder()).toPipeByte(RaiseThrowable$.MODULE$.fromApplicativeError(concurrent))), socket.writes(transportConfig.writeTimeout())), () -> {
            return new Stream($anonfun$connect$2(transportConnector));
        }), Stream$Compiler$.MODULE$.syncInstance(concurrent)).drain();
    }

    public static final /* synthetic */ FreeC $anonfun$connect$4(TransportConnector transportConnector) {
        return Stream$.MODULE$.eval(transportConnector.stateSignal().set(ConnectionState$Disconnected$.MODULE$));
    }

    private final Object incoming$1(Socket socket, TransportConfig transportConfig, Concurrent concurrent, TransportConnector transportConnector) {
        Stream$ stream$ = Stream$.MODULE$;
        Stream$ stream$2 = Stream$.MODULE$;
        Stream$ stream$3 = Stream$.MODULE$;
        Stream$ stream$4 = Stream$.MODULE$;
        Stream$ stream$5 = Stream$.MODULE$;
        FreeC reads = socket.reads(transportConfig.numReadBytes(), transportConfig.readTimeout());
        StreamDecoder$ streamDecoder$ = StreamDecoder$.MODULE$;
        Codec$ codec$ = Codec$.MODULE$;
        Codec<Frame> codec = Frame$.MODULE$.codec();
        return stream$.compile$extension(stream$2.onComplete$extension(stream$3.through$extension(stream$4.through$extension(stream$5.through$extension(reads, streamDecoder$.many(codec$.apply(Lazy$.MODULE$.apply(() -> {
            return codec;
        })).asDecoder()).toPipeByte(RaiseThrowable$.MODULE$.fromApplicativeError(concurrent))), tracingPipe(new Direction.In(transportConfig.traceMessages()), concurrent)), transportConnector.in()), () -> {
            return new Stream($anonfun$connect$4(transportConnector));
        }), Stream$Compiler$.MODULE$.syncInstance(concurrent)).drain();
    }

    public static final /* synthetic */ Object $anonfun$connect$5(Socket socket, Concurrent concurrent, boolean z) {
        return z ? socket.close() : Concurrent$.MODULE$.apply(concurrent).pure(BoxedUnit.UNIT);
    }

    private static final Object closeSignalWatcher$1(Socket socket, TransportConnector transportConnector, Concurrent concurrent) {
        return Stream$.MODULE$.compile$extension(Stream$.MODULE$.evalMap$extension(transportConnector.closeSignal().discrete(), obj -> {
            return $anonfun$connect$5(socket, concurrent, BoxesRunTime.unboxToBoolean(obj));
        }), Stream$Compiler$.MODULE$.syncInstance(concurrent)).drain();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final Object publishError$1(Throwable th, RetryDetails retryDetails, TransportConnector transportConnector) {
        Object obj;
        if (retryDetails instanceof RetryDetails.WillDelayAndRetry) {
            RetryDetails.WillDelayAndRetry willDelayAndRetry = (RetryDetails.WillDelayAndRetry) retryDetails;
            FiniteDuration nextDelay = willDelayAndRetry.nextDelay();
            int retriesSoFar = willDelayAndRetry.retriesSoFar();
            if (nextDelay != null) {
                obj = transportConnector.stateSignal().set(new ConnectionState.Connecting(nextDelay, retriesSoFar));
                return obj;
            }
        }
        if (!(retryDetails instanceof RetryDetails.GivingUp)) {
            throw new MatchError(retryDetails);
        }
        obj = transportConnector.stateSignal().set(new ConnectionState.Error(new Errors.ConnectionFailure(new ConnectionFailureReason.TransportError(th))));
        return obj;
    }

    public static final /* synthetic */ void $anonfun$connect$7(Either either) {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Object pump$1(Socket socket, TransportConnector transportConnector, Concurrent concurrent, TransportConfig transportConfig) {
        return implicits$.MODULE$.toFlatMapOps(transportConnector.stateSignal().set(ConnectionState$Connected$.MODULE$), concurrent).flatMap(boxedUnit -> {
            return implicits$.MODULE$.toFunctorOps(cats.effect.implicits.package$.MODULE$.toConcurrentOps(cats.effect.implicits.package$.MODULE$.toConcurrentOps(this.outgoing$1(socket, transportConnector, transportConfig, concurrent), concurrent).race(this.incoming$1(socket, transportConfig, concurrent, transportConnector)), concurrent).race(closeSignalWatcher$1(socket, transportConnector, concurrent)), concurrent).map(either -> {
                $anonfun$connect$7(either);
                return BoxedUnit.UNIT;
            });
        });
    }

    public static final /* synthetic */ Object $anonfun$connect$10(Transport$ transport$, Concurrent concurrent, ContextShift contextShift, TransportConfig transportConfig, TransportConnector transportConnector, ExecutionContext executionContext) {
        return SocketGroup$.MODULE$.apply(executionContext, SocketGroup$.MODULE$.apply$default$2(), SocketGroup$.MODULE$.apply$default$3(), concurrent, contextShift).use(socketGroup -> {
            return socketGroup.client(new InetSocketAddress(transportConfig.host(), transportConfig.port()), socketGroup.client$default$2(), socketGroup.client$default$3(), socketGroup.client$default$4(), socketGroup.client$default$5(), socketGroup.client$default$6(), socketGroup.client$default$7(), concurrent, contextShift).use(socket -> {
                return transportConfig.tlsConfig().fold(() -> {
                    return transport$.pump$1(socket, transportConnector, concurrent, transportConfig);
                }, tLSConfig -> {
                    return implicits$.MODULE$.toFlatMapOps(tLSConfig.contextOf(executionContext), concurrent).flatMap(tLSContext -> {
                        return tLSContext.client(socket, tLSConfig.tlsParameters(), MODULE$.traceTLS(transportConfig.traceMessages(), concurrent), concurrent, contextShift).use(socket -> {
                            return transport$.pump$1(socket, transportConnector, concurrent, transportConfig);
                        }, concurrent);
                    });
                });
            }, concurrent);
        }, concurrent);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final Object loop$1(TransportConfig transportConfig, Concurrent concurrent, TransportConnector transportConnector, ContextShift contextShift, Timer timer) {
        return FlatMapOps$.MODULE$.$greater$greater$extension(implicits$.MODULE$.catsSyntaxFlatMapOps(retry.package$.MODULE$.retryingOnAllErrors().apply(RetryConfig$.MODULE$.policyOf(transportConfig.retryConfig(), concurrent), (th, retryDetails) -> {
            return publishError$1(th, retryDetails, transportConnector);
        }, () -> {
            return Blocker$.MODULE$.apply(concurrent).use(obj -> {
                return $anonfun$connect$10(this, concurrent, contextShift, transportConfig, transportConnector, ((Blocker) obj).blockingContext());
            }, concurrent);
        }, concurrent, Sleep$.MODULE$.sleepUsingTimer(timer)), concurrent), () -> {
            return implicits$.MODULE$.toFlatMapOps(transportConnector.stateSignal().get(), concurrent).flatMap(connectionState -> {
                return ConnectionState$Disconnected$.MODULE$.equals(connectionState) ? FlatMapOps$.MODULE$.$greater$greater$extension(implicits$.MODULE$.catsSyntaxFlatMapOps(transportConnector.closeSignal().set(BoxesRunTime.boxToBoolean(false)), concurrent), () -> {
                    return this.loop$1(transportConfig, concurrent, transportConnector, contextShift, timer);
                }, concurrent) : Concurrent$.MODULE$.apply(concurrent).pure(BoxedUnit.UNIT);
            });
        }, concurrent);
    }

    private Transport$() {
        MODULE$ = this;
    }
}
