package org.eclipse.ditto.client.live.internal;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.ParametersAreNonnullByDefault;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.client.internal.CommonManagementImpl;
import org.eclipse.ditto.client.internal.HandlerRegistry;
import org.eclipse.ditto.client.internal.OutgoingMessageFactory;
import org.eclipse.ditto.client.internal.bus.AdaptableBus;
import org.eclipse.ditto.client.internal.bus.Classification;
import org.eclipse.ditto.client.internal.bus.PointerBus;
import org.eclipse.ditto.client.internal.bus.SelectorUtil;
import org.eclipse.ditto.client.live.Live;
import org.eclipse.ditto.client.live.LiveCommandProcessor;
import org.eclipse.ditto.client.live.LiveFeatureHandle;
import org.eclipse.ditto.client.live.LiveThingHandle;
import org.eclipse.ditto.client.live.commands.LiveCommandFactory;
import org.eclipse.ditto.client.live.commands.LiveCommandHandler;
import org.eclipse.ditto.client.live.commands.base.LiveCommand;
import org.eclipse.ditto.client.live.events.GlobalEventFactory;
import org.eclipse.ditto.client.live.events.internal.ImmutableGlobalEventFactory;
import org.eclipse.ditto.client.live.messages.MessageSerializerKey;
import org.eclipse.ditto.client.live.messages.MessageSerializerRegistry;
import org.eclipse.ditto.client.live.messages.PendingMessage;
import org.eclipse.ditto.client.live.messages.RepliableMessage;
import org.eclipse.ditto.client.messaging.MessagingProvider;
import org.eclipse.ditto.json.JsonKey;
import org.eclipse.ditto.messages.model.Message;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommandResponse;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.ProtocolFactory;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.WithThingId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
/* loaded from: input_file:org/eclipse/ditto/client/live/internal/LiveImpl.class */
public final class LiveImpl extends CommonManagementImpl<LiveThingHandle, LiveFeatureHandle> implements Live {
    private static final Logger LOGGER = LoggerFactory.getLogger(LiveImpl.class);
    private final JsonSchemaVersion schemaVersion;
    private final MessageSerializerRegistry messageSerializerRegistry;
    private final Map<Class<? extends LiveCommand<?, ?>>, LiveCommandHandler<?, ?>> liveCommandHandlers;
    private final Map<Classification.StreamingType, AdaptableBus.SubscriptionId> subscriptionIds;

    private LiveImpl(MessagingProvider messagingProvider, OutgoingMessageFactory outgoingMessageFactory, PointerBus pointerBus, JsonSchemaVersion jsonSchemaVersion, MessageSerializerRegistry messageSerializerRegistry) {
        super(TopicPath.Channel.LIVE, messagingProvider, outgoingMessageFactory, new HandlerRegistry(pointerBus), pointerBus);
        this.schemaVersion = jsonSchemaVersion;
        this.messageSerializerRegistry = messageSerializerRegistry;
        this.liveCommandHandlers = new ConcurrentHashMap();
        this.subscriptionIds = new ConcurrentHashMap();
    }

    public static LiveImpl newInstance(MessagingProvider messagingProvider, OutgoingMessageFactory outgoingMessageFactory, PointerBus pointerBus, JsonSchemaVersion jsonSchemaVersion, MessageSerializerRegistry messageSerializerRegistry) {
        return new LiveImpl(messagingProvider, outgoingMessageFactory, pointerBus, jsonSchemaVersion, messageSerializerRegistry);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.eclipse.ditto.client.internal.CommonManagementImpl
    /* renamed from: createThingHandle, reason: merged with bridge method [inline-methods] */
    public LiveThingHandle createThingHandle2(ThingId thingId) {
        return new LiveThingHandleImpl(thingId, getMessagingProvider(), getOutgoingMessageFactory(), getHandlerRegistry(), this.messageSerializerRegistry);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.eclipse.ditto.client.internal.CommonManagementImpl
    /* renamed from: createFeatureHandle, reason: merged with bridge method [inline-methods] */
    public LiveFeatureHandle createFeatureHandle2(ThingId thingId, String str) {
        return new LiveFeatureHandleImpl(thingId, str, getMessagingProvider(), getOutgoingMessageFactory(), getHandlerRegistry(), this.messageSerializerRegistry);
    }

    @Override // org.eclipse.ditto.client.internal.CommonManagementImpl
    protected CompletionStage<Void> doStartConsumption(Map<String, String> map) {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        CompletableFuture completableFuture3 = new CompletableFuture();
        CompletableFuture<Void> allOf = CompletableFuture.allOf(completableFuture, completableFuture2, completableFuture3);
        this.subscriptionIds.compute(Classification.StreamingType.LIVE_EVENT, (streamingType, subscriptionId) -> {
            String buildProtocolCommand = buildProtocolCommand(streamingType.start(), map);
            this.messagingProvider.registerSubscriptionMessage(streamingType, buildProtocolCommand);
            return subscribe(subscriptionId, streamingType, buildProtocolCommand, streamingType.startAck(), completableFuture);
        });
        this.subscriptionIds.compute(Classification.StreamingType.LIVE_MESSAGE, (streamingType2, subscriptionId2) -> {
            String buildProtocolCommand = buildProtocolCommand(streamingType2.start(), map);
            this.messagingProvider.registerSubscriptionMessage(streamingType2, buildProtocolCommand);
            return subscribeAndPublishMessage(subscriptionId2, streamingType2, buildProtocolCommand, streamingType2.startAck(), completableFuture2, adaptable -> {
                return pointerBus -> {
                    pointerBus.notify((CharSequence) getPointerBusKey(adaptable), (String) adaptableAsLiveMessage(adaptable));
                };
            });
        });
        this.subscriptionIds.compute(Classification.StreamingType.LIVE_COMMAND, (streamingType3, subscriptionId3) -> {
            String buildProtocolCommand = buildProtocolCommand(streamingType3.start(), map);
            this.messagingProvider.registerSubscriptionMessage(streamingType3, buildProtocolCommand);
            return subscribeAndPublishMessage(subscriptionId3, streamingType3, buildProtocolCommand, streamingType3.startAck(), completableFuture3, adaptable -> {
                return pointerBus -> {
                    pointerBus.getExecutor().submit(() -> {
                        handleLiveCommandOrResponse(adaptable);
                    });
                };
            });
        });
        return allOf;
    }

    @Override // org.eclipse.ditto.client.management.CommonManagement
    public CompletionStage<Void> suspendConsumption() {
        return CompletableFuture.allOf((CompletableFuture[]) this.subscriptionIds.entrySet().stream().map(entry -> {
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            this.messagingProvider.unregisterSubscriptionMessage(entry.getKey());
            unsubscribe((AdaptableBus.SubscriptionId) entry.getValue(), ((Classification.StreamingType) entry.getKey()).stop(), ((Classification.StreamingType) entry.getKey()).stopAck(), completableFuture);
            return completableFuture;
        }).toArray(i -> {
            return new CompletableFuture[i];
        }));
    }

    private static String getPointerBusKey(Adaptable adaptable) {
        TopicPath topicPath = adaptable.getTopicPath();
        return String.format("/things/%s:%s%s", topicPath.getNamespace(), topicPath.getEntityName(), adaptable.getPayload().getPath());
    }

    @Override // org.eclipse.ditto.client.live.Live
    public <T> PendingMessage<T> message() {
        return PendingMessageImpl.of(LOGGER, this.outgoingMessageFactory, this.messageSerializerRegistry, PROTOCOL_ADAPTER, this.messagingProvider);
    }

    @Override // org.eclipse.ditto.client.live.messages.MessageRegistration
    public <T, U> void registerForMessage(String str, String str2, Class<T> cls, Consumer<RepliableMessage<T, U>> consumer) {
        checkRegistrationId(str);
        checkSubject(str2);
        ConditionChecker.argumentNotNull(cls, "type");
        checkHandler(consumer);
        LiveMessagesUtil.checkSerializerExistForMessageType(this.messageSerializerRegistry, cls, str2);
        getHandlerRegistry().register(str, SelectorUtil.or(MessageSerializerKey.SUBJECT_WILDCARD.equals(str2) ? SelectorUtil.formatJsonPointer(LOGGER, "/things/'{thingId}'/'{direction}'/messages/'{subject}'", new Object[0]) : SelectorUtil.formatJsonPointer(LOGGER, "/things/'{thingId}'/'{direction}'/messages/{0}", str2), MessageSerializerKey.SUBJECT_WILDCARD.equals(str2) ? SelectorUtil.formatJsonPointer(LOGGER, "/things/'{thingId}'/features/'{featureId}'/'{direction}'/messages/'{subject}'", new Object[0]) : SelectorUtil.formatJsonPointer(LOGGER, "/things/'{thingId}'/features/'{featureId}'/'{direction}'/messages/{0}", str2)), LiveMessagesUtil.createEventConsumerForRepliableMessage(PROTOCOL_ADAPTER, getMessagingProvider(), getOutgoingMessageFactory(), this.messageSerializerRegistry, cls, consumer));
    }

    private static void checkRegistrationId(String str) {
        ConditionChecker.argumentNotNull(str, "registrationId");
    }

    private static void checkSubject(String str) {
        ConditionChecker.argumentNotNull(str, "subject");
    }

    private static void checkHandler(Consumer<?> consumer) {
        ConditionChecker.argumentNotNull(consumer, "handler");
    }

    @Override // org.eclipse.ditto.client.live.messages.MessageRegistration
    public <U> void registerForMessage(String str, String str2, Consumer<RepliableMessage<?, U>> consumer) {
        checkRegistrationId(str);
        checkSubject(str2);
        checkHandler(consumer);
        getHandlerRegistry().register(str, SelectorUtil.or(MessageSerializerKey.SUBJECT_WILDCARD.equals(str2) ? SelectorUtil.formatJsonPointer(LOGGER, "/things/'{thingId}'/'{direction}'/messages/'{subject}'", new Object[0]) : SelectorUtil.formatJsonPointer(LOGGER, "/things/'{thingId}'/'{direction}'/messages/{0}", str2), MessageSerializerKey.SUBJECT_WILDCARD.equals(str2) ? SelectorUtil.formatJsonPointer(LOGGER, "/things/'{thingId}'/features/'{featureId}'/'{direction}'/messages/'{subject}'", new Object[0]) : SelectorUtil.formatJsonPointer(LOGGER, "/things/'{thingId}'/features/'{featureId}'/'{direction}'/messages/{0}", str2)), LiveMessagesUtil.createEventConsumerForRepliableMessage(PROTOCOL_ADAPTER, getMessagingProvider(), getOutgoingMessageFactory(), this.messageSerializerRegistry, consumer));
    }

    @Override // org.eclipse.ditto.client.live.messages.ClaimMessageRegistration
    public <T, U> void registerForClaimMessage(String str, Class<T> cls, Consumer<RepliableMessage<T, U>> consumer) {
        checkRegistrationId(str);
        ConditionChecker.argumentNotNull(cls, "type");
        checkHandler(consumer);
        LiveMessagesUtil.checkSerializerExistForMessageType(this.messageSerializerRegistry, cls);
        getHandlerRegistry().register(str, SelectorUtil.formatJsonPointer(LOGGER, "/things/'{thingId}'/'{direction}'/messages/{0}", "claim"), LiveMessagesUtil.createEventConsumerForRepliableMessage(PROTOCOL_ADAPTER, getMessagingProvider(), getOutgoingMessageFactory(), this.messageSerializerRegistry, cls, consumer));
    }

    @Override // org.eclipse.ditto.client.live.messages.ClaimMessageRegistration
    public <U> void registerForClaimMessage(String str, Consumer<RepliableMessage<?, U>> consumer) {
        checkRegistrationId(str);
        checkHandler(consumer);
        getHandlerRegistry().register(str, SelectorUtil.formatJsonPointer(LOGGER, "/things/'{thingId}'/inbox/messages/{0}", "claim"), LiveMessagesUtil.createEventConsumerForRepliableMessage(PROTOCOL_ADAPTER, getMessagingProvider(), getOutgoingMessageFactory(), this.messageSerializerRegistry, consumer));
    }

    @Override // org.eclipse.ditto.client.live.events.EventEmitter
    public void emitEvent(Function<GlobalEventFactory, Event<?>> function) {
        ConditionChecker.argumentNotNull(function);
        getMessagingProvider().emit(signalToJsonString(adjustHeadersForLiveSignal(function.apply(ImmutableGlobalEventFactory.getInstance(this.schemaVersion)))));
    }

    @Override // org.eclipse.ditto.client.live.LiveCommandProcessor
    public Map<Class<? extends LiveCommand<?, ?>>, LiveCommandHandler<?, ?>> getLiveCommandHandlers() {
        return this.liveCommandHandlers;
    }

    @Override // org.eclipse.ditto.client.live.LiveCommandProcessor
    public void publishLiveSignal(Signal<?> signal) {
        getMessagingProvider().emitAdaptable(adaptOutgoingLiveSignal(signal));
    }

    @Override // org.eclipse.ditto.client.live.LiveCommandProcessor
    public Logger getLogger() {
        return LOGGER;
    }

    private void handleLiveCommandOrResponse(Adaptable adaptable) {
        if (adaptable.getPayload().getHttpStatus().isPresent()) {
            this.messagingProvider.getAdaptableBus().publish(ProtocolFactory.wrapAsJsonifiableAdaptable(adaptable).toJsonString());
        } else {
            handleLiveCommand(LiveCommandFactory.getInstance().getLiveCommand(PROTOCOL_ADAPTER.fromAdaptable(adaptable)));
        }
    }

    private void handleLiveCommand(LiveCommand<?, ?> liveCommand) {
        boolean z = false;
        Optional<ThingId> extractThingId = extractThingId(liveCommand);
        Optional<JsonKey> featureIdFromResourcePath = getFeatureIdFromResourcePath(liveCommand);
        if (extractThingId.isPresent() && featureIdFromResourcePath.isPresent()) {
            Optional<LiveFeatureHandle> filter = getFeatureHandle(extractThingId.get(), featureIdFromResourcePath.get().toString()).filter(liveFeatureHandle -> {
                return liveFeatureHandle instanceof LiveCommandProcessor;
            });
            Class<LiveCommandProcessor> cls = LiveCommandProcessor.class;
            Objects.requireNonNull(LiveCommandProcessor.class);
            z = ((Boolean) filter.map((v1) -> {
                return r1.cast(v1);
            }).map(liveCommandProcessor -> {
                return Boolean.valueOf(liveCommandProcessor.processLiveCommand(liveCommand));
            }).orElse(false)).booleanValue();
            LOGGER.debug("Live command of type '{}' handled with specific feature handle: {}", liveCommand.getType(), Boolean.valueOf(z));
        }
        if (!z && extractThingId.isPresent()) {
            Optional<LiveThingHandle> filter2 = getThingHandle(extractThingId.get()).filter(liveThingHandle -> {
                return liveThingHandle instanceof LiveCommandProcessor;
            });
            Class<LiveCommandProcessor> cls2 = LiveCommandProcessor.class;
            Objects.requireNonNull(LiveCommandProcessor.class);
            z = ((Boolean) filter2.map((v1) -> {
                return r1.cast(v1);
            }).map(liveCommandProcessor2 -> {
                return Boolean.valueOf(liveCommandProcessor2.processLiveCommand(liveCommand));
            }).orElse(false)).booleanValue();
            LOGGER.debug("Live command of type '{}' handled with specific thing handle: {}", liveCommand.getType(), Boolean.valueOf(z));
        }
        if (!z) {
            z = processLiveCommand(liveCommand);
            LOGGER.debug("Live command of type '{}' handled with global handle: {}", liveCommand.getType(), Boolean.valueOf(z));
        }
        if (z) {
            return;
        }
        LOGGER.warn("Incoming live command of type '{}'  was not processed.", liveCommand.getType());
    }

    private static Optional<ThingId> extractThingId(LiveCommand<?, ?> liveCommand) {
        Optional of = Optional.of(liveCommand);
        Class<WithThingId> cls = WithThingId.class;
        Objects.requireNonNull(WithThingId.class);
        return of.map((v1) -> {
            return r1.cast(v1);
        }).map((v0) -> {
            return v0.getEntityId();
        });
    }

    private Optional<JsonKey> getFeatureIdFromResourcePath(LiveCommand<?, ?> liveCommand) {
        return liveCommand.getResourcePath().get(1);
    }

    private static Message<?> adaptableAsLiveMessage(Adaptable adaptable) {
        MessageCommand fromAdaptable = PROTOCOL_ADAPTER.fromAdaptable(adaptable);
        return fromAdaptable instanceof MessageCommand ? fromAdaptable.getMessage() : ((MessageCommandResponse) fromAdaptable).getMessage();
    }

    @Override // org.eclipse.ditto.client.internal.AbstractHandle
    protected AcknowledgementLabel getThingResponseAcknowledgementLabel() {
        return DittoAcknowledgementLabel.LIVE_RESPONSE;
    }
}
