package com.thing2x.smqd.bridge;

import akka.actor.ActorSystem;
import akka.dispatch.MessageDispatcher;
import akka.http.scaladsl.Http$;
import akka.http.scaladsl.HttpExt;
import akka.http.scaladsl.model.ContentType;
import akka.http.scaladsl.model.ContentType$;
import akka.http.scaladsl.model.ContentTypes$;
import akka.http.scaladsl.model.HttpEntity;
import akka.http.scaladsl.model.HttpEntity$;
import akka.http.scaladsl.model.HttpMethod;
import akka.http.scaladsl.model.HttpMethods$;
import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpRequest$;
import akka.http.scaladsl.model.Uri$;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source$;
import akka.stream.scaladsl.SourceQueueWithComplete;
import akka.util.ByteString;
import com.thing2x.smqd.FilterPath;
import com.thing2x.smqd.FilterPath$;
import com.thing2x.smqd.Smqd;
import com.thing2x.smqd.TopicPath;
import com.thing2x.smqd.plugin.Bridge;
import com.thing2x.smqd.plugin.BridgeDriver;
import com.thing2x.smqd.util.ConfigUtil$;
import com.typesafe.config.Config;
import io.netty.buffer.ByteBuf;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.collection.immutable.List;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Left;
import scala.util.Right;
import scala.util.Success;
import scala.util.Try;

/* compiled from: HttpBridge.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\rc\u0001B\u0001\u0003\u0001-\u0011\u0001\u0003\u0013;ua\n\u0013\u0018\u000eZ4f\tJLg/\u001a:\u000b\u0005\r!\u0011A\u00022sS\u0012<WM\u0003\u0002\u0006\r\u0005!1/\\9e\u0015\t9\u0001\"A\u0004uQ&twM\r=\u000b\u0003%\t1aY8n\u0007\u0001\u00192\u0001\u0001\u0007\u0013!\ti\u0001#D\u0001\u000f\u0015\tyA!\u0001\u0004qYV<\u0017N\\\u0005\u0003#9\u0011AB\u0011:jI\u001e,GI]5wKJ\u0004\"a\u0005\r\u000e\u0003QQ!!\u0006\f\u0002\u0019M\u001c\u0017\r\\1m_\u001e<\u0017N\\4\u000b\u0005]A\u0011\u0001\u0003;za\u0016\u001c\u0018MZ3\n\u0005e!\"!D*ue&\u001cG\u000fT8hO&tw\rC\u0005\u001c\u0001\t\u0005\t\u0015!\u0003\u001dS\u0005!a.Y7f!\tibE\u0004\u0002\u001fIA\u0011qDI\u0007\u0002A)\u0011\u0011EC\u0001\u0007yI|w\u000e\u001e \u000b\u0003\r\nQa]2bY\u0006L!!\n\u0012\u0002\rA\u0013X\rZ3g\u0013\t9\u0003F\u0001\u0004TiJLgn\u001a\u0006\u0003K\tJ!a\u0007\u0016\n\u0005-r!AD!cgR\u0014\u0018m\u0019;QYV<\u0017N\u001c\u0005\n[\u0001\u0011\t\u0011)A\u0005]I\nAb]7rI&s7\u000f^1oG\u0016\u0004\"a\f\u0019\u000e\u0003\u0011I!!\r\u0003\u0003\tMk\u0017\u000fZ\u0005\u0003[)B\u0011\u0002\u000e\u0001\u0003\u0002\u0003\u0006I!\u000e\u001e\u0002\r\r|gNZ5h!\t1\u0004(D\u00018\u0015\t!d#\u0003\u0002:o\t11i\u001c8gS\u001eL!\u0001\u000e\u0016\t\u000bq\u0002A\u0011A\u001f\u0002\rqJg.\u001b;?)\u0011q\u0004)\u0011\"\u0011\u0005}\u0002Q\"\u0001\u0002\t\u000bmY\u0004\u0019\u0001\u000f\t\u000b5Z\u0004\u0019\u0001\u0018\t\u000bQZ\u0004\u0019A\u001b\t\u000f\u0011\u0003!\u0019!C\u0001\u000b\u0006Y\u0001/\u0019:bY2,G.[:n+\u00051\u0005CA$I\u001b\u0005\u0011\u0013BA%#\u0005\rIe\u000e\u001e\u0005\u0007\u0017\u0002\u0001\u000b\u0011\u0002$\u0002\u0019A\f'/\u00197mK2L7/\u001c\u0011\t\u000f5\u0003!\u0019!C\u0001\u000b\u0006I\u0011/^3vKNK'0\u001a\u0005\u0007\u001f\u0002\u0001\u000b\u0011\u0002$\u0002\u0015E,X-^3TSj,\u0007\u0005C\u0004R\u0001\t\u0007I\u0011\u0001*\u0002!=4XM\u001d4m_^\u001cFO]1uK\u001eLX#A*\u0011\u0005QKV\"A+\u000b\u0005Y;\u0016AB:ue\u0016\fWNC\u0001Y\u0003\u0011\t7n[1\n\u0005i+&\u0001E(wKJ4Gn\\<TiJ\fG/Z4z\u0011\u0019a\u0006\u0001)A\u0005'\u0006\trN^3sM2|wo\u0015;sCR,w-\u001f\u0011\t\u000fy\u0003\u0001\u0019!C\u0005?\u000611o\\;sG\u0016,\u0012\u0001\u0019\t\u0004\u000f\u0006\u001c\u0017B\u00012#\u0005\u0019y\u0005\u000f^5p]B\u0019AmZ5\u000e\u0003\u0015T!AZ+\u0002\u0011M\u001c\u0017\r\\1eg2L!\u0001[3\u0003/M{WO]2f#V,W/Z,ji\"\u001cu.\u001c9mKR,\u0007C\u00016q\u001b\u0005Y'B\u00017n\u0003\u0015iw\u000eZ3m\u0015\t1gN\u0003\u0002p/\u0006!\u0001\u000e\u001e;q\u0013\t\t8NA\u0006IiR\u0004(+Z9vKN$\bbB:\u0001\u0001\u0004%I\u0001^\u0001\u000bg>,(oY3`I\u0015\fHCA;y!\t9e/\u0003\u0002xE\t!QK\\5u\u0011\u001dI(/!AA\u0002\u0001\f1\u0001\u001f\u00132\u0011\u0019Y\b\u0001)Q\u0005A\u000691o\\;sG\u0016\u0004\u0003\"B?\u0001\t#r\u0018\u0001D2sK\u0006$XM\u0011:jI\u001e,G#B@\u0002\u0006\u0005\u001d\u0001cA\u0007\u0002\u0002%\u0019\u00111\u0001\b\u0003\r\t\u0013\u0018\u000eZ4f\u0011\u0015!D\u00101\u00016\u0011%\tI\u0001 I\u0001\u0002\u0004\tY!A\u0003j]\u0012,\u0007\u0010E\u0002H\u0003\u001bI1!a\u0004#\u0005\u0011auN\\4\t\u000f\u0005M\u0001\u0001\"\u0015\u0002\u0016\u000591m\u001c8oK\u000e$H#A;\t\u000f\u0005e\u0001\u0001\"\u0015\u0002\u0016\u0005QA-[:d_:tWm\u0019;\t\u000f\u0005u\u0001\u0001\"\u0001\u0002 \u00059A-\u001a7jm\u0016\u0014HcB;\u0002\"\u0005%\u00121\u0007\u0005\b\u0007\u0005m\u0001\u0019AA\u0012!\ry\u0014QE\u0005\u0004\u0003O\u0011!A\u0003%uiB\u0014%/\u001b3hK\"A\u00111FA\u000e\u0001\u0004\ti#A\u0005u_BL7\rU1uQB\u0019q&a\f\n\u0007\u0005EBAA\u0005U_BL7\rU1uQ\"A\u0011QGA\u000e\u0001\u0004\t9$A\u0002ng\u001e\u00042aRA\u001d\u0013\r\tYD\t\u0002\u0004\u0003:L\b\"DA \u0001A\u0005\u0019\u0011!A\u0005\n\u0005\u0005\u0013&\u0001\u0006tkB,'\u000f\n8b[\u0016,\u0012\u0001\b")
/* loaded from: input_file:com/thing2x/smqd/bridge/HttpBridgeDriver.class */
public class HttpBridgeDriver extends BridgeDriver {
    private final int parallelism;
    private final int queueSize;
    private final OverflowStrategy overflowStrategy;
    private Option<SourceQueueWithComplete<HttpRequest>> source;

    private /* synthetic */ String super$name() {
        return super/*com.thing2x.smqd.plugin.AbstractPlugin*/.name();
    }

    public int parallelism() {
        return this.parallelism;
    }

    public int queueSize() {
        return this.queueSize;
    }

    public OverflowStrategy overflowStrategy() {
        return this.overflowStrategy;
    }

    private Option<SourceQueueWithComplete<HttpRequest>> source() {
        return this.source;
    }

    private void source_$eq(Option<SourceQueueWithComplete<HttpRequest>> option) {
        this.source = option;
    }

    public Bridge createBridge(Config config, long j) {
        ContentType.WithFixedCharset application$divoctet$minusstream;
        ContentType.WithFixedCharset application$divoctet$minusstream2;
        ContentType.WithFixedCharset withFixedCharset;
        FilterPath apply = FilterPath$.MODULE$.apply(config.getString("topic"));
        String string = config.getString("method");
        HttpMethod POST = "POST".equals(string) ? HttpMethods$.MODULE$.POST() : "PUT".equals(string) ? HttpMethods$.MODULE$.PUT() : "GET".equals(string) ? HttpMethods$.MODULE$.GET() : "DELETE".equals(string) ? HttpMethods$.MODULE$.DELETE() : "PATCH".equals(string) ? HttpMethods$.MODULE$.PATCH() : HttpMethods$.MODULE$.POST();
        Some optionString = ConfigUtil$.MODULE$.configToOptionalConfig(config).getOptionString("content-type");
        if (optionString instanceof Some) {
            String lowerCase = ((String) optionString.value()).toLowerCase();
            if ("application/json".equals(lowerCase)) {
                withFixedCharset = ContentTypes$.MODULE$.application$divjson();
            } else if ("text/plain".equals(lowerCase)) {
                withFixedCharset = ContentTypes$.MODULE$.text$divplain$u0028UTF$minus8$u0029();
            } else if ("text/html".equals(lowerCase)) {
                withFixedCharset = ContentTypes$.MODULE$.text$divhtml$u0028UTF$minus8$u0029();
            } else if ("text/xml".equals(lowerCase)) {
                withFixedCharset = ContentTypes$.MODULE$.text$divxml$u0028UTF$minus8$u0029();
            } else if ("application/octet-stream".equals(lowerCase)) {
                withFixedCharset = ContentTypes$.MODULE$.application$divoctet$minusstream();
            } else {
                Right parse = ContentType$.MODULE$.parse(lowerCase);
                if (parse instanceof Right) {
                    application$divoctet$minusstream2 = (ContentType) parse.value();
                } else {
                    if (!(parse instanceof Left)) {
                        throw new MatchError(parse);
                    }
                    List list = (List) ((Left) parse).value();
                    if (logger().underlying().isWarnEnabled()) {
                        logger().underlying().warn(new StringBuilder(40).append("HttpBridge mis-configured content-type: ").append(lowerCase).toString(), new Object[]{list.mkString(", ")});
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    } else {
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    }
                    application$divoctet$minusstream2 = ContentTypes$.MODULE$.application$divoctet$minusstream();
                }
                withFixedCharset = application$divoctet$minusstream2;
            }
            application$divoctet$minusstream = withFixedCharset;
        } else {
            if (!None$.MODULE$.equals(optionString)) {
                throw new MatchError(optionString);
            }
            application$divoctet$minusstream = ContentTypes$.MODULE$.application$divoctet$minusstream();
        }
        ContentType.WithFixedCharset withFixedCharset2 = application$divoctet$minusstream;
        Option optionString2 = ConfigUtil$.MODULE$.configToOptionalConfig(config).getOptionString("uri");
        Option optionString3 = ConfigUtil$.MODULE$.configToOptionalConfig(config).getOptionString("prefix");
        Option optionString4 = ConfigUtil$.MODULE$.configToOptionalConfig(config).getOptionString("suffix");
        return new HttpBridge(this, j, apply, POST, withFixedCharset2, (!optionString2.isDefined() || ((String) optionString2.get()).length() <= 0) ? None$.MODULE$ : optionString2, (!optionString3.isDefined() || ((String) optionString3.get()).length() <= 0) ? None$.MODULE$ : optionString3, (!optionString4.isDefined() || ((String) optionString4.get()).length() <= 0) ? None$.MODULE$ : optionString4);
    }

    public void connect() {
        MessageDispatcher gloablDispatcher = super/*com.thing2x.smqd.plugin.AbstractPlugin*/.smqdInstance().Implicit().gloablDispatcher();
        ActorSystem system = super/*com.thing2x.smqd.plugin.AbstractPlugin*/.smqdInstance().Implicit().system();
        Materializer materializer = super/*com.thing2x.smqd.plugin.AbstractPlugin*/.smqdInstance().Implicit().materializer();
        HttpExt apply = Http$.MODULE$.apply(system);
        SourceQueueWithComplete sourceQueueWithComplete = (SourceQueueWithComplete) Source$.MODULE$.queue(queueSize(), overflowStrategy()).mapAsyncUnordered(parallelism(), httpRequest -> {
            return apply.singleRequest(httpRequest, apply.singleRequest$default$2(), apply.singleRequest$default$3(), apply.singleRequest$default$4());
        }).toMat(Sink$.MODULE$.last(), Keep$.MODULE$.left()).run(materializer);
        sourceQueueWithComplete.watchCompletion().onComplete(r6 -> {
            $anonfun$connect$2(this, apply, r6);
            return BoxedUnit.UNIT;
        }, gloablDispatcher);
        source_$eq(new Some(sourceQueueWithComplete));
    }

    public void disconnect() {
        if (source().isDefined()) {
            ((SourceQueueWithComplete) source().get()).complete();
        }
        source_$eq(None$.MODULE$);
    }

    public void deliver(HttpBridge httpBridge, TopicPath topicPath, Object obj) {
        BoxedUnit boxedUnit;
        HttpEntity.Strict apply;
        BoxedUnit boxedUnit2;
        Some source = source();
        if (source instanceof Some) {
            SourceQueueWithComplete sourceQueueWithComplete = (SourceQueueWithComplete) source.value();
            if (!isClosed()) {
                String sb = httpBridge.path().isDefined() ? (String) httpBridge.path().get() : new StringBuilder(0).append((String) httpBridge.prefix().getOrElse(() -> {
                    return "";
                })).append(topicPath.toString()).append(httpBridge.suffix().getOrElse(() -> {
                    return "";
                })).toString();
                if (obj instanceof ByteBuf) {
                    ByteBuf byteBuf = (ByteBuf) obj;
                    byte[] bArr = new byte[byteBuf.readableBytes()];
                    byteBuf.getBytes(0, bArr);
                    apply = HttpEntity$.MODULE$.apply(httpBridge.contentType(), bArr);
                } else {
                    apply = obj instanceof ByteString ? HttpEntity$.MODULE$.apply(httpBridge.contentType(), (ByteString) obj) : HttpEntity$.MODULE$.apply(obj.toString());
                }
                HttpEntity.Strict strict = apply;
                sourceQueueWithComplete.offer(HttpRequest$.MODULE$.apply(httpBridge.method(), Uri$.MODULE$.apply(sb), HttpRequest$.MODULE$.apply$default$3(), strict, HttpRequest$.MODULE$.apply$default$5()));
                if (logger().underlying().isTraceEnabled()) {
                    logger().underlying().trace("HttpBridgeDriver({}) {} {}, payload: {} {} bytes", new Object[]{super/*com.thing2x.smqd.plugin.AbstractPlugin*/.name(), httpBridge.method().value(), sb.toString(), strict.contentType().toString(), BoxesRunTime.boxToLong(strict.contentLength())});
                    boxedUnit2 = BoxedUnit.UNIT;
                } else {
                    boxedUnit2 = BoxedUnit.UNIT;
                }
                return;
            }
        }
        if (logger().underlying().isWarnEnabled()) {
            logger().underlying().warn("HttpBridgeDriver({}) is not connected, messages for '{}' will be discarded", new String[]{super/*com.thing2x.smqd.plugin.AbstractPlugin*/.name(), topicPath.toString()});
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$connect$2(HttpBridgeDriver httpBridgeDriver, HttpExt httpExt, Try r9) {
        BoxedUnit boxedUnit;
        BoxedUnit boxedUnit2;
        if (r9 instanceof Success) {
            httpBridgeDriver.source_$eq(None$.MODULE$);
            httpExt.shutdownAllConnectionPools();
            if (httpBridgeDriver.logger().underlying().isDebugEnabled()) {
                httpBridgeDriver.logger().underlying().debug("HttpBridgeDriver({}) connection closed.", new Object[]{httpBridgeDriver.super$name()});
                boxedUnit2 = BoxedUnit.UNIT;
            } else {
                boxedUnit2 = BoxedUnit.UNIT;
            }
            return;
        }
        if (!(r9 instanceof Failure)) {
            throw new MatchError(r9);
        }
        Throwable exception = ((Failure) r9).exception();
        httpBridgeDriver.source_$eq(None$.MODULE$);
        httpExt.shutdownAllConnectionPools();
        if (httpBridgeDriver.logger().underlying().isDebugEnabled()) {
            httpBridgeDriver.logger().underlying().debug(new StringBuilder(36).append("MqttBridgeDriver(").append(httpBridgeDriver.super$name()).append(") connection lost: ").toString(), exception);
            boxedUnit = BoxedUnit.UNIT;
        } else {
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    public HttpBridgeDriver(String str, Smqd smqd, Config config) {
        super(str, smqd, config);
        int unboxToInt = BoxesRunTime.unboxToInt(ConfigUtil$.MODULE$.configToOptionalConfig(super/*com.thing2x.smqd.plugin.AbstractPlugin*/.config()).getOptionInt("parallelism").getOrElse(() -> {
            return 1;
        }));
        switch (unboxToInt) {
            default:
                this.parallelism = unboxToInt <= 0 ? 1 : unboxToInt;
                this.queueSize = BoxesRunTime.unboxToInt(ConfigUtil$.MODULE$.configToOptionalConfig(super/*com.thing2x.smqd.plugin.AbstractPlugin*/.config()).getOptionInt("queue").getOrElse(() -> {
                    return 10;
                }));
                this.overflowStrategy = ConfigUtil$.MODULE$.configToOptionalConfig(super/*com.thing2x.smqd.plugin.AbstractPlugin*/.config()).getOverflowStrategy("overflow-strategy");
                this.source = None$.MODULE$;
                return;
        }
    }
}
