package net.sigusr.mqtt.impl.protocol;

import cats.effect.Blocker;
import cats.effect.Blocker$;
import cats.effect.Concurrent;
import cats.effect.Concurrent$;
import cats.effect.ContextShift;
import cats.effect.Timer;
import cats.implicits$;
import cats.syntax.FlatMapOps$;
import fs2.RaiseThrowable$;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$Compiler$;
import fs2.internal.FreeC;
import fs2.io.tcp.Socket;
import fs2.io.tcp.SocketGroup$;
import java.io.Serializable;
import java.net.InetSocketAddress;
import net.sigusr.impl.protocol.Direction;
import net.sigusr.impl.protocol.Direction$In$;
import net.sigusr.impl.protocol.Direction$Out$;
import net.sigusr.mqtt.api.ConnectionFailureReason$TransportError$;
import net.sigusr.mqtt.api.ConnectionState$Connected$;
import net.sigusr.mqtt.api.ConnectionState$Connecting$;
import net.sigusr.mqtt.api.ConnectionState$Disconnected$;
import net.sigusr.mqtt.api.ConnectionState$Error$;
import net.sigusr.mqtt.api.Errors$ConnectionFailure$;
import net.sigusr.mqtt.api.RetryConfig$;
import net.sigusr.mqtt.api.TransportConfig;
import net.sigusr.mqtt.impl.frames.Frame;
import net.sigusr.mqtt.impl.frames.Frame$;
import retry.RetryDetails;
import retry.Sleep$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ModuleSerializationProxy;
import scodec.stream.StreamDecoder$;
import scodec.stream.StreamEncoder$;

/* compiled from: Transport.scala */
/* loaded from: input_file:net/sigusr/mqtt/impl/protocol/Transport$.class */
public final class Transport$ implements Serializable {
    public static final Transport$ MODULE$ = new Transport$();

    private Transport$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(Transport$.class);
    }

    private <F> Option<Function1<String, Object>> traceTLS(boolean z, Concurrent<F> concurrent) {
        return z ? Some$.MODULE$.apply(str -> {
            return package$.MODULE$.putStrLn("\u001b[35m[TLS] " + str + "\u001b[0m", concurrent);
        }) : None$.MODULE$;
    }

    private <F> Function1<FreeC, FreeC> tracingPipe(Direction direction, Concurrent<F> concurrent) {
        return obj -> {
            return new Stream(tracingPipe$$anonfun$4(direction, concurrent, obj == null ? null : ((Stream) obj).fs2$Stream$$free()));
        };
    }

    private <F> Object connect(TransportConfig<F> transportConfig, TransportConnector<F> transportConnector, Concurrent<F> concurrent, ContextShift<F> contextShift, Timer<F> timer) {
        return cats.effect.implicits.package$.MODULE$.toConcurrentOps(loop$1(transportConfig, transportConnector, concurrent, contextShift, timer), concurrent).start();
    }

    public <F> Object apply(TransportConfig<F> transportConfig, TransportConnector<F> transportConnector, Concurrent<F> concurrent, ContextShift<F> contextShift, Timer<F> timer) {
        return implicits$.MODULE$.toFunctorOps(connect(transportConfig, transportConnector, concurrent, contextShift, timer), concurrent).map(fiber -> {
            return new Transport<F>() { // from class: net.sigusr.mqtt.impl.protocol.Transport$$anon$1
            };
        });
    }

    private final /* synthetic */ FreeC tracingPipe$$anonfun$2$$anonfun$2(Direction direction, Concurrent concurrent, Frame frame) {
        return Stream$.MODULE$.map$extension(direction.active() ? Stream$.MODULE$.eval(package$.MODULE$.putStrLn(" " + direction.value() + " " + direction.color() + frame + "\u001b[0m", concurrent)) : Stream$.MODULE$.eval(Concurrent$.MODULE$.apply(concurrent).unit()), boxedUnit -> {
            return frame;
        });
    }

    private final /* synthetic */ FreeC tracingPipe$$anonfun$4(Direction direction, Concurrent concurrent, FreeC freeC) {
        return Stream$.MODULE$.flatMap$extension(freeC, frame -> {
            return new Stream(tracingPipe$$anonfun$2$$anonfun$2(direction, concurrent, frame));
        });
    }

    private final FreeC outgoing$1$$anonfun$1(TransportConnector transportConnector) {
        return Stream$.MODULE$.eval(transportConnector.stateSignal().set(ConnectionState$Disconnected$.MODULE$));
    }

    private final Object outgoing$2(TransportConfig transportConfig, TransportConnector transportConnector, Concurrent concurrent, Socket socket) {
        return Stream$.MODULE$.compile$extension(Stream$.MODULE$.onComplete$extension(Stream$.MODULE$.through$extension(Stream$.MODULE$.through$extension(Stream$.MODULE$.through$extension(transportConnector.out(), tracingPipe(Direction$Out$.MODULE$.apply(transportConfig.traceMessages()), concurrent)), StreamEncoder$.MODULE$.many(Frame$.MODULE$.codec().asEncoder()).toPipeByte(RaiseThrowable$.MODULE$.fromApplicativeError(concurrent))), socket.writes(transportConfig.writeTimeout())), () -> {
            return new Stream(outgoing$1$$anonfun$1(transportConnector));
        }), Stream$Compiler$.MODULE$.syncInstance(concurrent)).drain();
    }

    private final FreeC incoming$1$$anonfun$1(TransportConnector transportConnector) {
        return Stream$.MODULE$.eval(transportConnector.stateSignal().set(ConnectionState$Disconnected$.MODULE$));
    }

    private final Object incoming$2(TransportConfig transportConfig, TransportConnector transportConnector, Concurrent concurrent, Socket socket) {
        return Stream$.MODULE$.compile$extension(Stream$.MODULE$.onComplete$extension(Stream$.MODULE$.through$extension(Stream$.MODULE$.through$extension(Stream$.MODULE$.through$extension(socket.reads(transportConfig.numReadBytes(), transportConfig.readTimeout()), StreamDecoder$.MODULE$.many(Frame$.MODULE$.codec().asDecoder()).toPipeByte(RaiseThrowable$.MODULE$.fromApplicativeError(concurrent))), tracingPipe(Direction$In$.MODULE$.apply(transportConfig.traceMessages()), concurrent)), transportConnector.in()), () -> {
            return new Stream(incoming$1$$anonfun$1(transportConnector));
        }), Stream$Compiler$.MODULE$.syncInstance(concurrent)).drain();
    }

    private final /* synthetic */ Object closeSignalWatcher$1$$anonfun$1(Concurrent concurrent, Socket socket, boolean z) {
        return z ? socket.close() : Concurrent$.MODULE$.apply(concurrent).pure(BoxedUnit.UNIT);
    }

    private final Object closeSignalWatcher$2(TransportConnector transportConnector, Concurrent concurrent, Socket socket) {
        return Stream$.MODULE$.compile$extension(Stream$.MODULE$.evalMap$extension(transportConnector.closeSignal().discrete(), obj -> {
            return closeSignalWatcher$1$$anonfun$1(concurrent, socket, BoxesRunTime.unboxToBoolean(obj));
        }), Stream$Compiler$.MODULE$.syncInstance(concurrent)).drain();
    }

    private final Object publishError$1(TransportConnector transportConnector, Throwable th, RetryDetails retryDetails) {
        if (retryDetails instanceof RetryDetails.WillDelayAndRetry) {
            RetryDetails.WillDelayAndRetry willDelayAndRetry = (RetryDetails.WillDelayAndRetry) retryDetails;
            FiniteDuration nextDelay = willDelayAndRetry.nextDelay();
            int retriesSoFar = willDelayAndRetry.retriesSoFar();
            if (nextDelay != null && 1 != 0) {
                return transportConnector.stateSignal().set(ConnectionState$Connecting$.MODULE$.apply(nextDelay, retriesSoFar));
            }
        }
        if (retryDetails instanceof RetryDetails.GivingUp) {
            return transportConnector.stateSignal().set(ConnectionState$Error$.MODULE$.apply(Errors$ConnectionFailure$.MODULE$.apply(ConnectionFailureReason$TransportError$.MODULE$.apply(th))));
        }
        throw new MatchError(retryDetails);
    }

    private final Object pump$3(TransportConfig transportConfig, TransportConnector transportConnector, Concurrent concurrent, Socket socket) {
        return implicits$.MODULE$.toFlatMapOps(transportConnector.stateSignal().set(ConnectionState$Connected$.MODULE$), concurrent).flatMap(boxedUnit -> {
            return implicits$.MODULE$.toFunctorOps(cats.effect.implicits.package$.MODULE$.toConcurrentOps(cats.effect.implicits.package$.MODULE$.toConcurrentOps(outgoing$2(transportConfig, transportConnector, concurrent, socket), concurrent).race(incoming$2(transportConfig, transportConnector, concurrent, socket)), concurrent).race(closeSignalWatcher$2(transportConnector, concurrent, socket)), concurrent).map(either -> {
            });
        });
    }

    private final Object loop$3$$anonfun$2$$anonfun$1$$anonfun$1$$anonfun$1$$anonfun$1(TransportConfig transportConfig, TransportConnector transportConnector, Concurrent concurrent, Socket socket) {
        return pump$3(transportConfig, transportConnector, concurrent, socket);
    }

    private final /* synthetic */ Object loop$9$$anonfun$8$$anonfun$7(TransportConfig transportConfig, TransportConnector transportConnector, Concurrent concurrent, ContextShift contextShift, ExecutionContext executionContext) {
        return SocketGroup$.MODULE$.apply(executionContext, SocketGroup$.MODULE$.apply$default$2(), SocketGroup$.MODULE$.apply$default$3(), concurrent, contextShift).use(socketGroup -> {
            return socketGroup.client(new InetSocketAddress(transportConfig.host(), transportConfig.port()), socketGroup.client$default$2(), socketGroup.client$default$3(), socketGroup.client$default$4(), socketGroup.client$default$5(), socketGroup.client$default$6(), socketGroup.client$default$7(), concurrent, contextShift).use(socket -> {
                return transportConfig.tlsConfig().fold(() -> {
                    return r1.loop$3$$anonfun$2$$anonfun$1$$anonfun$1$$anonfun$1$$anonfun$1(r2, r3, r4, r5);
                }, tLSConfig -> {
                    return implicits$.MODULE$.toFlatMapOps(tLSConfig.contextOf(executionContext), concurrent).flatMap(tLSContext -> {
                        return tLSContext.client(socket, tLSConfig.tlsParameters(), traceTLS(transportConfig.traceMessages(), concurrent), concurrent, contextShift).use(socket -> {
                            return pump$3(transportConfig, transportConnector, concurrent, socket);
                        }, concurrent);
                    });
                });
            }, concurrent);
        }, concurrent);
    }

    private final Object loop$11$$anonfun$10(TransportConfig transportConfig, TransportConnector transportConnector, Concurrent concurrent, ContextShift contextShift) {
        return Blocker$.MODULE$.apply(concurrent).use(obj -> {
            return loop$9$$anonfun$8$$anonfun$7(transportConfig, transportConnector, concurrent, contextShift, obj == null ? null : ((Blocker) obj).blockingContext());
        }, concurrent);
    }

    private final Object loop$12$$anonfun$11$$anonfun$1$$anonfun$1(TransportConfig transportConfig, TransportConnector transportConnector, Concurrent concurrent, ContextShift contextShift, Timer timer) {
        return loop$1(transportConfig, transportConnector, concurrent, contextShift, timer);
    }

    private final Object loop$14$$anonfun$13(TransportConfig transportConfig, TransportConnector transportConnector, Concurrent concurrent, ContextShift contextShift, Timer timer) {
        return implicits$.MODULE$.toFlatMapOps(transportConnector.stateSignal().get(), concurrent).flatMap(connectionState -> {
            if (!ConnectionState$Disconnected$.MODULE$.equals(connectionState)) {
                return Concurrent$.MODULE$.apply(concurrent).pure(BoxedUnit.UNIT);
            }
            return FlatMapOps$.MODULE$.$greater$greater$extension(implicits$.MODULE$.catsSyntaxFlatMapOps(transportConnector.closeSignal().set(BoxesRunTime.boxToBoolean(false)), concurrent), () -> {
                return r2.loop$12$$anonfun$11$$anonfun$1$$anonfun$1(r3, r4, r5, r6, r7);
            }, concurrent);
        });
    }

    private final Object loop$1(TransportConfig transportConfig, TransportConnector transportConnector, Concurrent concurrent, ContextShift contextShift, Timer timer) {
        return FlatMapOps$.MODULE$.$greater$greater$extension(implicits$.MODULE$.catsSyntaxFlatMapOps(retry.package$.MODULE$.retryingOnAllErrors().apply(RetryConfig$.MODULE$.policyOf(transportConfig.retryConfig(), concurrent), (th, retryDetails) -> {
            return publishError$1(transportConnector, th, retryDetails);
        }, () -> {
            return r4.loop$11$$anonfun$10(r5, r6, r7, r8);
        }, concurrent, Sleep$.MODULE$.sleepUsingTimer(timer)), concurrent), () -> {
            return r2.loop$14$$anonfun$13(r3, r4, r5, r6, r7);
        }, concurrent);
    }
}
