package se.arkalix.core.plugin.eh;

import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.arkalix.ArSystem;
import se.arkalix.core.plugin.CloudException;
import se.arkalix.core.plugin.ErrorResponseException;
import se.arkalix.core.plugin.SystemDetails;
import se.arkalix.core.plugin.SystemDetailsDto;
import se.arkalix.description.ProviderDescription;
import se.arkalix.description.ServiceDescription;
import se.arkalix.descriptor.EncodingDescriptor;
import se.arkalix.net.http.HttpStatus;
import se.arkalix.net.http.service.HttpService;
import se.arkalix.plugin.Plugin;
import se.arkalix.plugin.PluginAttached;
import se.arkalix.plugin.PluginFacade;
import se.arkalix.security.access.AccessPolicy;
import se.arkalix.util.concurrent.Future;
import se.arkalix.util.concurrent.Futures;

/* loaded from: input_file:se/arkalix/core/plugin/eh/HttpJsonEventSubscriberPlugin.class */
public class HttpJsonEventSubscriberPlugin implements ArEventSubscriberPlugin {
    private static final Logger logger = LoggerFactory.getLogger(HttpJsonEventSubscriberPlugin.class);
    private final List<EventSubscription> defaultSubscriptions = new CopyOnWriteArrayList();

    /* loaded from: input_file:se/arkalix/core/plugin/eh/HttpJsonEventSubscriberPlugin$Attached.class */
    private static class Attached implements PluginAttached {
        private final String basePath;
        private final ArSystem system;
        private final SystemDetailsDto subscriber;
        private final Facade facade = new Facade();
        private final ConcurrentHashMap<String, Topic> nameToTopic = new ConcurrentHashMap<>();
        private final AtomicBoolean isDetached = new AtomicBoolean(false);

        /* loaded from: input_file:se/arkalix/core/plugin/eh/HttpJsonEventSubscriberPlugin$Attached$Facade.class */
        private class Facade implements ArEventSubscriberPluginFacade {
            private Facade() {
            }

            @Override // se.arkalix.core.plugin.eh.ArEventSubscriberPluginFacade
            public Future<EventSubscriptionHandle> subscribe(EventSubscription eventSubscription) {
                return Attached.this.subscribe(eventSubscription);
            }
        }

        Attached(ArSystem arSystem, List<EventSubscription> list) {
            HttpJsonEventSubscriberPlugin.logger.info("HTTP/JSON event subscriber attaching to system \"{}\" ...", arSystem.name());
            this.system = arSystem;
            this.subscriber = SystemDetails.from(this.system);
            this.basePath = "/events/" + this.system.name();
            arSystem.consume().using(HttpJsonEventSubscribeService.factory()).flatMap(httpJsonEventSubscribeService -> {
                if (HttpJsonEventSubscriberPlugin.logger.isInfoEnabled()) {
                    ServiceDescription service = httpJsonEventSubscribeService.service();
                    ProviderDescription provider = httpJsonEventSubscribeService.service().provider();
                    HttpJsonEventSubscriberPlugin.logger.info("HTTP/JSON event subscriber can now receive events for system \"{}\" via \"{}\" provided by \"{}\" {} ...", new Object[]{arSystem.name(), service.name(), provider.name(), provider.socketAddress()});
                }
                return arSystem.provide(new HttpService().name("event-subscriber").basePath(this.basePath).accessPolicy(AccessPolicy.whitelist(new String[]{httpJsonEventSubscribeService.service().provider().name()})).encodings(new EncodingDescriptor[]{EncodingDescriptor.JSON}).post("/#topic", (httpServiceRequest, httpServiceResponse) -> {
                    String pathParameter = httpServiceRequest.pathParameter(0);
                    return httpServiceRequest.bodyAs(EventIncomingDto.class).ifSuccess(eventIncomingDto -> {
                        try {
                            Topic topic = this.nameToTopic.get(pathParameter.toLowerCase());
                            if (topic != null) {
                                topic.publish((ProviderDescription) eventIncomingDto.publisher().map((v0) -> {
                                    return v0.toProviderDescription();
                                }).orElse(null), eventIncomingDto.metadata(), eventIncomingDto.data());
                            } else if (HttpJsonEventSubscriberPlugin.logger.isWarnEnabled()) {
                                HttpJsonEventSubscriberPlugin.logger.warn("HTTP/JSON event subscriber received unexpected event [topic=" + pathParameter + "]: {}", eventIncomingDto);
                            }
                        } finally {
                            httpServiceResponse.status(HttpStatus.OK);
                        }
                    }).mapCatch(Throwable.class, th -> {
                        if (HttpJsonEventSubscriberPlugin.logger.isWarnEnabled()) {
                            HttpJsonEventSubscriberPlugin.logger.warn("HTTP/JSON event subscriber failed to handle received event [topic=" + pathParameter + "]", th);
                        }
                        if (!httpServiceResponse.status().isEmpty()) {
                            return null;
                        }
                        httpServiceResponse.status(HttpStatus.BAD_REQUEST);
                        return null;
                    });
                })).flatMap(arServiceHandle -> {
                    return Futures.serialize(list.stream().map(this::subscribe));
                }).mapFault(Throwable.class, th -> {
                    return new CloudException("HTTP/JSON event subscriber failed to setup event receiver for the \"" + arSystem.name() + "\" system", th);
                });
            }).ifSuccess(list2 -> {
                if (HttpJsonEventSubscriberPlugin.logger.isInfoEnabled()) {
                    HttpJsonEventSubscriberPlugin.logger.info("HTTP/JSON event subscriber attached to system \"{}\" and registered all default event subscriptions", arSystem.name());
                }
            }).onFailure(th -> {
                if (HttpJsonEventSubscriberPlugin.logger.isErrorEnabled()) {
                    HttpJsonEventSubscriberPlugin.logger.error("HTTP/JSON event subscriber failed to attach to system \"" + arSystem.name() + "\"", th);
                }
            });
        }

        private Future<EventSubscriptionHandle> subscribe(EventSubscription eventSubscription) {
            if (this.isDetached.get()) {
                throw new IllegalStateException("Plugin \"" + HttpJsonEventSubscriberPlugin.class + "\" is detached from its system \"" + this.system.name() + "\"; cannot create subscription");
            }
            String lowerCase = eventSubscription.topic().orElseThrow(() -> {
                return new IllegalArgumentException(eventSubscription + " does not specify a topic; cannot register subscription");
            }).toLowerCase();
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            AtomicReference atomicReference = new AtomicReference(null);
            this.nameToTopic.compute(lowerCase, (str, topic) -> {
                if (topic == null) {
                    topic = new Topic(str, this::unsubscribe);
                    atomicBoolean.set(true);
                }
                atomicReference.set(topic.register(eventSubscription));
                HttpJsonEventSubscriberPlugin.logger.info("HTTP/JSON event subscriber registered {} to topic \"{}\" for system \"{}\"", new Object[]{eventSubscription, str, this.system.name()});
                return topic;
            });
            if (!atomicBoolean.get()) {
                return Future.success((EventSubscriptionHandle) atomicReference.get());
            }
            EventSubscriptionRequestDto subscriptionRequest = eventSubscription.toSubscriptionRequest(this.subscriber, this.basePath + "/" + lowerCase);
            HttpJsonEventSubscriberPlugin.logger.info("HTTP/JSON event subscriber is subscribing system \"{}\" to topic \"{}\" ...", this.system.name(), lowerCase);
            return this.system.consume().using(HttpJsonEventSubscribeService.factory()).flatMap(httpJsonEventSubscribeService -> {
                return httpJsonEventSubscribeService.subscribe(subscriptionRequest).flatMapCatch(ErrorResponseException.class, errorResponseException -> {
                    return "INVALID_PARAMETER".equals(errorResponseException.error().type()) ? this.system.consume().using(HttpJsonEventUnsubscribeService.factory()).flatMap(httpJsonEventUnsubscribeService -> {
                        return httpJsonEventUnsubscribeService.unsubscribe(lowerCase, this.system);
                    }).flatMap(obj -> {
                        return httpJsonEventSubscribeService.subscribe(subscriptionRequest);
                    }).pass((Object) null) : Future.failure(errorResponseException);
                });
            }).map(obj -> {
                if (HttpJsonEventSubscriberPlugin.logger.isInfoEnabled()) {
                    HttpJsonEventSubscriberPlugin.logger.info("HTTP/JSON event subscriber did subscribe system \"{}\" to topic \"{}\"", this.system.name(), lowerCase);
                }
                return (EventSubscriptionHandle) atomicReference.get();
            }).ifFailure(Throwable.class, th -> {
                if (HttpJsonEventSubscriberPlugin.logger.isWarnEnabled()) {
                    HttpJsonEventSubscriberPlugin.logger.warn("HTTP/JSON event subscriber did fail to subscribe system \"" + this.system.name() + "\" to topic \"" + lowerCase + "\"", th);
                }
                this.nameToTopic.remove(lowerCase);
            });
        }

        private void unsubscribe(String str) {
            HttpJsonEventSubscriberPlugin.logger.info("HTTP/JSON event subscriber is unsubscribing system \"{}\" from topic \"{}\" ...", this.system.name(), str);
            this.system.consume().using(HttpJsonEventUnsubscribeService.factory()).flatMap(httpJsonEventUnsubscribeService -> {
                return httpJsonEventUnsubscribeService.unsubscribe(str, this.system);
            }).ifSuccess(obj -> {
                HttpJsonEventSubscriberPlugin.logger.info("HTTP/JSON event subscriber unsubscribed system \"{}\" from topic \"{}\"", this.system.name(), str);
            }).onFailure(th -> {
                HttpJsonEventSubscriberPlugin.logger.warn("HTTP/JSON event subscriber failed to unsubscribe system \"" + this.system.name() + "\" from topic \"" + str + "\"", th);
            });
        }

        public Optional<PluginFacade> facade() {
            return Optional.of(this.facade);
        }

        public void onDetach() {
            if (this.isDetached.getAndSet(true)) {
                return;
            }
            if (HttpJsonEventSubscriberPlugin.logger.isInfoEnabled()) {
                HttpJsonEventSubscriberPlugin.logger.info("HTTP/JSON event subscriber is detaching from system \"{}\" and unregistering its event subscriptions ...", this.system.name());
            }
            this.system.consume().using(HttpJsonEventUnsubscribeService.factory()).ifSuccess(httpJsonEventUnsubscribeService -> {
                Iterator it = this.nameToTopic.keySet().iterator();
                while (it.hasNext()) {
                    unsubscribe((String) it.next());
                }
                this.nameToTopic.clear();
                HttpJsonEventSubscriberPlugin.logger.info("HTTP/JSON event subscriber detached from system \"{}\"", this.system.name());
            }).onFailure(th -> {
                if (HttpJsonEventSubscriberPlugin.logger.isWarnEnabled()) {
                    HttpJsonEventSubscriberPlugin.logger.warn("HTTP/JSON event subscriber failed to unregister the event subscriptions of the \"" + this.system.name() + "\" system", th);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:se/arkalix/core/plugin/eh/HttpJsonEventSubscriberPlugin$Handle.class */
    public static class Handle implements EventSubscriptionHandle {
        private final EventSubscriptionHandler handler;
        private final Map<String, String> metadata;
        private final Set<ProviderDescription> providers;
        private final Consumer<Handle> onUnsubscribe;
        private final AtomicBoolean isUnsubscribed = new AtomicBoolean(false);

        private Handle(EventSubscription eventSubscription, Consumer<Handle> consumer) {
            Objects.requireNonNull(eventSubscription, "Expected subscription");
            this.onUnsubscribe = (Consumer) Objects.requireNonNull(consumer, "Expected onUnsubscribe");
            this.handler = eventSubscription.handler().orElseThrow(() -> {
                return new IllegalArgumentException(eventSubscription + " does not contain an event handler; cannot register subscription");
            });
            this.metadata = eventSubscription.metadata().isEmpty() ? null : eventSubscription.metadata();
            this.providers = eventSubscription.providers().isEmpty() ? null : eventSubscription.providers();
        }

        public void publish(ProviderDescription providerDescription, Map<String, String> map, String str) {
            if (this.providers == null || this.providers.contains(providerDescription)) {
                if (this.metadata != null) {
                    for (Map.Entry<String, String> entry : this.metadata.entrySet()) {
                        String str2 = map.get(entry.getKey());
                        if (str2 == null || !str2.equals(entry.getValue())) {
                            return;
                        }
                    }
                }
                this.handler.onPublish(map, str);
            }
        }

        @Override // se.arkalix.core.plugin.eh.EventSubscriptionHandle
        public void unsubscribe() {
            if (this.isUnsubscribed.compareAndSet(false, true)) {
                this.onUnsubscribe.accept(this);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:se/arkalix/core/plugin/eh/HttpJsonEventSubscriberPlugin$Topic.class */
    public static class Topic {
        private final String name;
        private final Consumer<String> onEmpty;
        private final Set<Handle> handles = Collections.synchronizedSet(new HashSet());

        private Topic(String str, Consumer<String> consumer) {
            this.name = (String) Objects.requireNonNull(str, "Expected name");
            this.onEmpty = (Consumer) Objects.requireNonNull(consumer, "Expected onEmpty");
        }

        public String name() {
            return this.name;
        }

        public void publish(ProviderDescription providerDescription, Map<String, String> map, String str) {
            Iterator<Handle> it = this.handles.iterator();
            while (it.hasNext()) {
                try {
                    it.next().publish(providerDescription, map, str);
                } catch (Throwable th) {
                    HttpJsonEventSubscriberPlugin.logger.error("HTTP/JSON event subscription threw unexpected exception while handling event [topic=" + this.name + "]", th);
                }
            }
        }

        public EventSubscriptionHandle register(EventSubscription eventSubscription) {
            Handle handle = new Handle(eventSubscription, this::remove);
            this.handles.add(handle);
            return handle;
        }

        private void remove(Handle handle) {
            if (this.handles.remove(handle) && this.handles.isEmpty()) {
                this.onEmpty.accept(this.name);
            }
        }
    }

    @Override // se.arkalix.core.plugin.eh.ArEventSubscriberPlugin
    public HttpJsonEventSubscriberPlugin subscribe(EventSubscription eventSubscription) {
        this.defaultSubscriptions.add(eventSubscription);
        return this;
    }

    public Future<PluginAttached> attachTo(ArSystem arSystem, Map<Class<? extends Plugin>, PluginFacade> map) {
        return Future.success(new Attached(arSystem, this.defaultSubscriptions));
    }
}
