package org.eclipse.ditto.services.gateway.endpoints.routes.sse;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.http.javadsl.marshalling.sse.EventStreamMarshalling;
import akka.http.javadsl.model.HttpHeader;
import akka.http.javadsl.model.MediaTypes;
import akka.http.javadsl.model.StatusCodes;
import akka.http.javadsl.model.headers.Accept;
import akka.http.javadsl.model.sse.ServerSentEvent;
import akka.http.javadsl.server.PathMatchers;
import akka.http.javadsl.server.RequestContext;
import akka.http.javadsl.server.Route;
import akka.http.javadsl.server.directives.RouteDirectives;
import akka.japi.pf.PFBuilder;
import akka.pattern.Patterns;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Source;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.model.base.auth.AuthorizationContext;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.exceptions.SignalEnrichmentFailedException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
import org.eclipse.ditto.model.query.criteria.CriteriaFactoryImpl;
import org.eclipse.ditto.model.query.filter.QueryFilterCriteriaFactory;
import org.eclipse.ditto.model.query.things.ModelBasedThingsFieldExpressionFactory;
import org.eclipse.ditto.model.things.Thing;
import org.eclipse.ditto.model.things.ThingFieldSelector;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.services.gateway.endpoints.routes.AbstractRoute;
import org.eclipse.ditto.services.gateway.endpoints.routes.things.ThingsParameter;
import org.eclipse.ditto.services.gateway.endpoints.utils.EventSniffer;
import org.eclipse.ditto.services.gateway.endpoints.utils.GatewaySignalEnrichmentProvider;
import org.eclipse.ditto.services.gateway.streaming.Connect;
import org.eclipse.ditto.services.gateway.streaming.StartStreaming;
import org.eclipse.ditto.services.gateway.streaming.actors.SessionedJsonifiable;
import org.eclipse.ditto.services.gateway.streaming.actors.SupervisedStream;
import org.eclipse.ditto.services.gateway.util.config.streaming.StreamingConfig;
import org.eclipse.ditto.services.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
import org.eclipse.ditto.services.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.services.utils.pubsub.StreamingType;
import org.eclipse.ditto.services.utils.search.SearchSource;
import org.eclipse.ditto.services.utils.search.SearchSourceBuilder;
import org.eclipse.ditto.signals.events.things.ThingEvent;
import scala.PartialFunction;

@NotThreadSafe
/* loaded from: input_file:org/eclipse/ditto/services/gateway/endpoints/routes/sse/ThingsSseRouteBuilder.class */
public final class ThingsSseRouteBuilder extends RouteDirectives implements SseRouteBuilder {
    private static final String PATH_SEARCH = "search";
    private static final String PATH_THINGS = "things";
    private static final String STREAMING_TYPE_SSE = "SSE";
    private static final String LAST_EVENT_ID_HEADER = "Last-Event-ID";
    private static final String PARAM_FILTER = "filter";
    private static final String PARAM_FIELDS = "fields";
    private static final String PARAM_OPTION = "option";
    private static final String PARAM_NAMESPACES = "namespaces";
    private static final String PARAM_EXTRA_FIELDS = "extraFields";
    private static final PartialFunction<HttpHeader, Accept> ACCEPT_HEADER_EXTRACTOR = newAcceptHeaderExtractor();
    private static final Counter THINGS_SSE_COUNTER = getCounterFor("things");
    private static final Counter SEARCH_SSE_COUNTER = getCounterFor("search");
    private static final Duration LOCAL_ASK_TIMEOUT = Duration.ofSeconds(5);
    private final ActorRef streamingActor;
    private final StreamingConfig streamingConfig;
    private final QueryFilterCriteriaFactory queryFilterCriteriaFactory;
    private final ActorRef pubSubMediator;
    private SseAuthorizationEnforcer sseAuthorizationEnforcer = new NoOpSseAuthorizationEnforcer();
    private SseConnectionSupervisor sseConnectionSupervisor = new NoOpSseConnectionSupervisor();
    private EventSniffer<ServerSentEvent> eventSniffer = EventSniffer.noOp();

    @Nullable
    GatewaySignalEnrichmentProvider signalEnrichmentProvider;

    @Nullable
    ActorRef proxyActor;

    /* loaded from: input_file:org/eclipse/ditto/services/gateway/endpoints/routes/sse/ThingsSseRouteBuilder$NoOpSseAuthorizationEnforcer.class */
    private static final class NoOpSseAuthorizationEnforcer implements SseAuthorizationEnforcer {
        private NoOpSseAuthorizationEnforcer() {
        }

        @Override // org.eclipse.ditto.services.gateway.endpoints.routes.sse.SseAuthorizationEnforcer
        public CompletionStage<Void> checkAuthorization(RequestContext requestContext, DittoHeaders dittoHeaders) {
            return CompletableFuture.completedStage(null);
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/gateway/endpoints/routes/sse/ThingsSseRouteBuilder$NoOpSseConnectionSupervisor.class */
    private static final class NoOpSseConnectionSupervisor implements SseConnectionSupervisor {
        private NoOpSseConnectionSupervisor() {
        }

        @Override // org.eclipse.ditto.services.gateway.endpoints.routes.sse.SseConnectionSupervisor
        public void supervise(SupervisedStream supervisedStream, CharSequence charSequence, DittoHeaders dittoHeaders) {
        }
    }

    private ThingsSseRouteBuilder(ActorRef actorRef, StreamingConfig streamingConfig, QueryFilterCriteriaFactory queryFilterCriteriaFactory, ActorRef actorRef2) {
        this.streamingActor = actorRef;
        this.streamingConfig = streamingConfig;
        this.queryFilterCriteriaFactory = queryFilterCriteriaFactory;
        this.pubSubMediator = actorRef2;
    }

    public static ThingsSseRouteBuilder getInstance(ActorRef actorRef, StreamingConfig streamingConfig, ActorRef actorRef2) {
        ConditionChecker.checkNotNull(actorRef, "streamingActor");
        return new ThingsSseRouteBuilder(actorRef, streamingConfig, new QueryFilterCriteriaFactory(new CriteriaFactoryImpl(), new ModelBasedThingsFieldExpressionFactory()), actorRef2);
    }

    @Override // org.eclipse.ditto.services.gateway.endpoints.routes.sse.SseRouteBuilder
    public SseRouteBuilder withAuthorizationEnforcer(SseAuthorizationEnforcer sseAuthorizationEnforcer) {
        this.sseAuthorizationEnforcer = (SseAuthorizationEnforcer) ConditionChecker.checkNotNull(sseAuthorizationEnforcer, "enforcer");
        return this;
    }

    @Override // org.eclipse.ditto.services.gateway.endpoints.routes.sse.SseRouteBuilder
    public ThingsSseRouteBuilder withEventSniffer(EventSniffer<ServerSentEvent> eventSniffer) {
        this.eventSniffer = (EventSniffer) ConditionChecker.checkNotNull(eventSniffer, "eventSniffer");
        return this;
    }

    @Override // org.eclipse.ditto.services.gateway.endpoints.routes.sse.SseRouteBuilder
    public SseRouteBuilder withSseConnectionSupervisor(SseConnectionSupervisor sseConnectionSupervisor) {
        this.sseConnectionSupervisor = (SseConnectionSupervisor) ConditionChecker.checkNotNull(sseConnectionSupervisor, "sseConnectionSupervisor");
        return this;
    }

    @Override // org.eclipse.ditto.services.gateway.endpoints.routes.sse.SseRouteBuilder
    public SseRouteBuilder withSignalEnrichmentProvider(@Nullable GatewaySignalEnrichmentProvider gatewaySignalEnrichmentProvider) {
        this.signalEnrichmentProvider = gatewaySignalEnrichmentProvider;
        return this;
    }

    @Override // org.eclipse.ditto.services.gateway.endpoints.routes.sse.SseRouteBuilder
    public SseRouteBuilder withProxyActor(@Nullable ActorRef actorRef) {
        this.proxyActor = actorRef;
        return this;
    }

    @Override // org.eclipse.ditto.services.gateway.endpoints.routes.sse.SseRouteBuilder
    public Route build(RequestContext requestContext, Supplier<CompletionStage<DittoHeaders>> supplier) {
        return concat(rawPathPrefix(PathMatchers.slash().concat("things"), () -> {
            return pathEndOrSingleSlash(() -> {
                return get(() -> {
                    return headerValuePF(ACCEPT_HEADER_EXTRACTOR, accept -> {
                        return buildThingsSseRoute(requestContext, ((CompletionStage) supplier.get()).thenApply(ThingsSseRouteBuilder::getDittoHeadersWithCorrelationId));
                    });
                });
            });
        }), new Route[]{buildSearchSseRoute(requestContext, supplier)});
    }

    private Route buildSearchSseRoute(RequestContext requestContext, Supplier<CompletionStage<DittoHeaders>> supplier) {
        return rawPathPrefix(PathMatchers.slash().concat("search").slash().concat("things"), () -> {
            return pathEndOrSingleSlash(() -> {
                return get(() -> {
                    return headerValuePF(ACCEPT_HEADER_EXTRACTOR, accept -> {
                        CompletionStage thenApply = ((CompletionStage) supplier.get()).thenApply(ThingsSseRouteBuilder::getDittoHeadersWithCorrelationId);
                        return parameterMap(map -> {
                            return createSearchSseRoute(requestContext, thenApply, map);
                        });
                    });
                });
            });
        });
    }

    private static DittoHeaders getDittoHeadersWithCorrelationId(DittoHeaders dittoHeaders) {
        return dittoHeaders.getCorrelationId().isPresent() ? dittoHeaders : dittoHeaders.toBuilder().correlationId(String.valueOf(UUID.randomUUID())).build();
    }

    private Route buildThingsSseRoute(RequestContext requestContext, CompletionStage<DittoHeaders> completionStage) {
        return parameterMap(map -> {
            return createSseRoute(requestContext, completionStage, map);
        });
    }

    private Route createSseRoute(RequestContext requestContext, CompletionStage<DittoHeaders> completionStage, Map<String, String> map) {
        String str = map.get(PARAM_FILTER);
        List<String> namespaces = getNamespaces(map.get(PARAM_NAMESPACES));
        List<ThingId> thingIds = getThingIds(map.get(ThingsParameter.IDS.toString()));
        ThingFieldSelector fieldSelector = getFieldSelector(map.get(ThingsParameter.FIELDS.toString()));
        ThingFieldSelector fieldSelector2 = getFieldSelector(map.get(PARAM_EXTRA_FIELDS));
        SignalEnrichmentFacade facade = this.signalEnrichmentProvider == null ? null : this.signalEnrichmentProvider.getFacade(requestContext.getRequest());
        return completeOKWithFuture(completionStage.thenCompose(dittoHeaders -> {
            return this.sseAuthorizationEnforcer.checkAuthorization(requestContext, dittoHeaders).thenApply(r17 -> {
                if (str != null) {
                    this.queryFilterCriteriaFactory.filterCriteria(str, dittoHeaders);
                }
                return SupervisedStream.sourceQueue(10).viaMat(KillSwitches.single(), Keep.both()).mapMaterializedValue(pair -> {
                    SupervisedStream.WithQueue withQueue = (SupervisedStream.WithQueue) pair.first();
                    KillSwitch killSwitch = (KillSwitch) pair.second();
                    String str2 = (String) dittoHeaders.getCorrelationId().orElseThrow(() -> {
                        return new IllegalStateException("Expected correlation-id in SSE DittoHeaders: " + dittoHeaders);
                    });
                    JsonSchemaVersion jsonSchemaVersion = (JsonSchemaVersion) dittoHeaders.getSchemaVersion().orElse(JsonSchemaVersion.LATEST);
                    this.sseConnectionSupervisor.supervise(withQueue.getSupervisedStream(), str2, dittoHeaders);
                    AuthorizationContext authorizationContext = dittoHeaders.getAuthorizationContext();
                    Connect connect = new Connect(withQueue.getSourceQueue(), str2, STREAMING_TYPE_SSE, jsonSchemaVersion, (Instant) null, Set.of(), authorizationContext);
                    StartStreaming build = StartStreaming.getBuilder(StreamingType.EVENTS, str2, authorizationContext).withNamespaces(namespaces).withFilter(str).withExtraFields(fieldSelector2).build();
                    CompletionStage ask = Patterns.ask(this.streamingActor, connect, LOCAL_ASK_TIMEOUT);
                    Class<ActorRef> cls = ActorRef.class;
                    Objects.requireNonNull(ActorRef.class);
                    ask.thenApply(cls::cast).thenAccept(actorRef -> {
                        actorRef.tell(build, ActorRef.noSender());
                    }).exceptionally(th -> {
                        killSwitch.abort(th);
                        return null;
                    });
                    return NotUsed.getInstance();
                }).mapAsync(this.streamingConfig.getParallelism(), sessionedJsonifiable -> {
                    return postprocess(sessionedJsonifiable, facade, thingIds, namespaces, fieldSelector);
                }).mapConcat(collection -> {
                    return collection;
                }).map(jsonObject -> {
                    THINGS_SSE_COUNTER.increment();
                    return ServerSentEvent.create(jsonObject.toString());
                }).log("SSE things").viaMat(this.eventSniffer.toAsyncFlow(requestContext.getRequest()), Keep.none()).keepAlive(Duration.ofSeconds(1L), ServerSentEvent::heartbeat);
            });
        }), EventStreamMarshalling.toEventStream());
    }

    private Route createSearchSseRoute(RequestContext requestContext, CompletionStage<DittoHeaders> completionStage, Map<String, String> map) {
        return this.proxyActor == null ? complete(StatusCodes.NOT_IMPLEMENTED) : completeOKWithFuture(completionStage.thenApply(dittoHeaders -> {
            this.sseAuthorizationEnforcer.checkAuthorization(requestContext, dittoHeaders);
            SearchSourceBuilder dittoHeaders = SearchSource.newBuilder().pubSubMediator(this.pubSubMediator).conciergeForwarder(ActorSelection.apply(this.proxyActor, "")).filter((String) map.get(PARAM_FILTER)).options((String) map.get(PARAM_OPTION)).fields((String) map.get(PARAM_FIELDS)).namespaces((String) map.get(PARAM_NAMESPACES)).dittoHeaders(dittoHeaders);
            requestContext.getRequest().getHeader(LAST_EVENT_ID_HEADER).ifPresent(httpHeader -> {
                dittoHeaders.lastThingId(httpHeader.value());
            });
            return dittoHeaders.build().startAsPair(resumeSourceBuilder -> {
            }).via(AbstractRoute.throttleByConfig(this.streamingConfig.getSseConfig().getThrottlingConfig())).map(pair -> {
                SEARCH_SSE_COUNTER.increment();
                return ServerSentEvent.create(((JsonObject) pair.second()).toString(), Optional.empty(), Optional.of((String) pair.first()), OptionalInt.empty());
            }).recoverWithRetries(1, new PFBuilder().match(DittoRuntimeException.class, dittoRuntimeException -> {
                return Source.single(ServerSentEvent.create(dittoRuntimeException.toJsonString()));
            }).build()).log("SSE search").via(this.eventSniffer.toAsyncFlow(requestContext.getRequest()));
        }), EventStreamMarshalling.toEventStream());
    }

    private CompletionStage<Collection<JsonObject>> postprocess(SessionedJsonifiable sessionedJsonifiable, @Nullable SignalEnrichmentFacade signalEnrichmentFacade, Collection<ThingId> collection, Collection<String> collection2, @Nullable JsonFieldSelector jsonFieldSelector) {
        Supplier supplier = () -> {
            return CompletableFuture.completedFuture(Collections.emptyList());
        };
        if (sessionedJsonifiable.getJsonifiable() instanceof ThingEvent) {
            ThingEvent jsonifiable = sessionedJsonifiable.getJsonifiable();
            if (!StreamingType.isLiveSignal(jsonifiable) && namespaceMatches(jsonifiable, collection2) && targetThingIdMatches(jsonifiable, collection)) {
                return (CompletionStage) sessionedJsonifiable.getSession().map(streamingSession -> {
                    return sessionedJsonifiable.retrieveExtraFields(signalEnrichmentFacade).thenApply(jsonObject -> {
                        Optional of = Optional.of(streamingSession.mergeThingWithExtra(jsonifiable, jsonObject));
                        Objects.requireNonNull(streamingSession);
                        return (Collection) of.filter(streamingSession::matchesFilter).map(thing -> {
                            return toNonemptyThingJson(thing, jsonifiable, jsonFieldSelector);
                        }).orElseGet(Collections::emptyList);
                    }).exceptionally(th -> {
                        return Collections.singletonList((th instanceof DittoRuntimeException ? (DittoRuntimeException) th : SignalEnrichmentFailedException.newBuilder().build()).toJson());
                    });
                }).orElseGet(supplier);
            }
        }
        return (CompletionStage) supplier.get();
    }

    private static boolean namespaceMatches(ThingEvent<?> thingEvent, Collection<String> collection) {
        return collection.isEmpty() || collection.contains(namespaceFromId(thingEvent));
    }

    private static boolean targetThingIdMatches(ThingEvent<?> thingEvent, Collection<ThingId> collection) {
        return collection.isEmpty() || collection.contains(thingEvent.getEntityId());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Collection<JsonObject> toNonemptyThingJson(Thing thing, ThingEvent<?> thingEvent, @Nullable JsonFieldSelector jsonFieldSelector) {
        JsonSchemaVersion jsonSchemaVersion = (JsonSchemaVersion) thingEvent.getDittoHeaders().getSchemaVersion().orElse(thingEvent.getImplementedSchemaVersion());
        JsonObject json = null != jsonFieldSelector ? thing.toJson(jsonSchemaVersion, jsonFieldSelector) : thing.toJson(jsonSchemaVersion);
        return json.isEmpty() ? Collections.emptyList() : Collections.singletonList(json);
    }

    private static List<String> getNamespaces(@Nullable String str) {
        return null != str ? Arrays.asList(str.split(",")) : Collections.emptyList();
    }

    private static List<ThingId> getThingIds(@Nullable String str) {
        return null != str ? (List) Stream.of((Object[]) str.split(",")).map((v0) -> {
            return ThingId.of(v0);
        }).collect(Collectors.toList()) : Collections.emptyList();
    }

    @Nullable
    private static ThingFieldSelector getFieldSelector(@Nullable String str) {
        if (str == null) {
            return null;
        }
        return ThingFieldSelector.fromString(str);
    }

    private static String namespaceFromId(ThingEvent<?> thingEvent) {
        return thingEvent.getEntityId().getNamespace();
    }

    private static PartialFunction<HttpHeader, Accept> newAcceptHeaderExtractor() {
        return new PFBuilder().match(Accept.class, ThingsSseRouteBuilder::matchesTextEventStream, accept -> {
            return accept;
        }).build();
    }

    private static boolean matchesTextEventStream(Accept accept) {
        return StreamSupport.stream(accept.getMediaRanges().spliterator(), false).filter(mediaRange -> {
            return !"*".equals(mediaRange.mainType());
        }).anyMatch(mediaRange2 -> {
            return mediaRange2.matches(MediaTypes.TEXT_EVENT_STREAM);
        });
    }

    private static Counter getCounterFor(String str) {
        return DittoMetrics.counter("streaming_messages").tag("type", "sse").tag("direction", "out").tag("path", str);
    }

    @Override // org.eclipse.ditto.services.gateway.endpoints.routes.sse.SseRouteBuilder
    public /* bridge */ /* synthetic */ SseRouteBuilder withEventSniffer(EventSniffer eventSniffer) {
        return withEventSniffer((EventSniffer<ServerSentEvent>) eventSniffer);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2034926781:
                if (implMethodName.equals("lambda$createSseRoute$4d1b316f$1")) {
                    z = 5;
                    break;
                }
                break;
            case 200896764:
                if (implMethodName.equals("heartbeat")) {
                    z = 4;
                    break;
                }
                break;
            case 1073074708:
                if (implMethodName.equals("lambda$createSearchSseRoute$36df5910$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1456606178:
                if (implMethodName.equals("lambda$createSseRoute$19a265b8$1")) {
                    z = false;
                    break;
                }
                break;
            case 2003587612:
                if (implMethodName.equals("lambda$createSseRoute$36df5910$1")) {
                    z = true;
                    break;
                }
                break;
            case 2003587613:
                if (implMethodName.equals("lambda$createSseRoute$36df5910$2")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/sse/ThingsSseRouteBuilder") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/services/models/signalenrichment/SignalEnrichmentFacade;Ljava/util/List;Ljava/util/List;Lorg/eclipse/ditto/model/things/ThingFieldSelector;Lorg/eclipse/ditto/services/gateway/streaming/actors/SessionedJsonifiable;)Ljava/util/concurrent/CompletionStage;")) {
                    ThingsSseRouteBuilder thingsSseRouteBuilder = (ThingsSseRouteBuilder) serializedLambda.getCapturedArg(0);
                    SignalEnrichmentFacade signalEnrichmentFacade = (SignalEnrichmentFacade) serializedLambda.getCapturedArg(1);
                    List list = (List) serializedLambda.getCapturedArg(2);
                    List list2 = (List) serializedLambda.getCapturedArg(3);
                    ThingFieldSelector thingFieldSelector = (ThingFieldSelector) serializedLambda.getCapturedArg(4);
                    return sessionedJsonifiable -> {
                        return postprocess(sessionedJsonifiable, signalEnrichmentFacade, list, list2, thingFieldSelector);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/sse/ThingsSseRouteBuilder") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Collection;)Ljava/lang/Iterable;")) {
                    return collection -> {
                        return collection;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/sse/ThingsSseRouteBuilder") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/json/JsonObject;)Lakka/http/javadsl/model/sse/ServerSentEvent;")) {
                    return jsonObject -> {
                        THINGS_SSE_COUNTER.increment();
                        return ServerSentEvent.create(jsonObject.toString());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/sse/ThingsSseRouteBuilder") && serializedLambda.getImplMethodSignature().equals("(Lakka/japi/Pair;)Lakka/http/javadsl/model/sse/ServerSentEvent;")) {
                    return pair -> {
                        SEARCH_SSE_COUNTER.increment();
                        return ServerSentEvent.create(((JsonObject) pair.second()).toString(), Optional.empty(), Optional.of((String) pair.first()), OptionalInt.empty());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Creator") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("akka/http/javadsl/model/sse/ServerSentEvent") && serializedLambda.getImplMethodSignature().equals("()Lakka/http/javadsl/model/sse/ServerSentEvent;")) {
                    return ServerSentEvent::heartbeat;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/sse/ThingsSseRouteBuilder") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/model/base/headers/DittoHeaders;Ljava/util/List;Ljava/lang/String;Lorg/eclipse/ditto/model/things/ThingFieldSelector;Lakka/japi/Pair;)Lakka/NotUsed;")) {
                    ThingsSseRouteBuilder thingsSseRouteBuilder2 = (ThingsSseRouteBuilder) serializedLambda.getCapturedArg(0);
                    DittoHeaders dittoHeaders = (DittoHeaders) serializedLambda.getCapturedArg(1);
                    List list3 = (List) serializedLambda.getCapturedArg(2);
                    String str = (String) serializedLambda.getCapturedArg(3);
                    ThingFieldSelector thingFieldSelector2 = (ThingFieldSelector) serializedLambda.getCapturedArg(4);
                    return pair2 -> {
                        SupervisedStream.WithQueue withQueue = (SupervisedStream.WithQueue) pair2.first();
                        KillSwitch killSwitch = (KillSwitch) pair2.second();
                        String str2 = (String) dittoHeaders.getCorrelationId().orElseThrow(() -> {
                            return new IllegalStateException("Expected correlation-id in SSE DittoHeaders: " + dittoHeaders);
                        });
                        JsonSchemaVersion jsonSchemaVersion = (JsonSchemaVersion) dittoHeaders.getSchemaVersion().orElse(JsonSchemaVersion.LATEST);
                        this.sseConnectionSupervisor.supervise(withQueue.getSupervisedStream(), str2, dittoHeaders);
                        AuthorizationContext authorizationContext = dittoHeaders.getAuthorizationContext();
                        Connect connect = new Connect(withQueue.getSourceQueue(), str2, STREAMING_TYPE_SSE, jsonSchemaVersion, (Instant) null, Set.of(), authorizationContext);
                        StartStreaming build = StartStreaming.getBuilder(StreamingType.EVENTS, str2, authorizationContext).withNamespaces(list3).withFilter(str).withExtraFields(thingFieldSelector2).build();
                        CompletionStage ask = Patterns.ask(this.streamingActor, connect, LOCAL_ASK_TIMEOUT);
                        Class<ActorRef> cls = ActorRef.class;
                        Objects.requireNonNull(ActorRef.class);
                        ask.thenApply(cls::cast).thenAccept(actorRef -> {
                            actorRef.tell(build, ActorRef.noSender());
                        }).exceptionally(th -> {
                            killSwitch.abort(th);
                            return null;
                        });
                        return NotUsed.getInstance();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
