/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.ditto.internal.utils.persistence.operations;

import akka.actor.AbstractActor;
import akka.actor.Actor;
import akka.actor.ActorRef;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.Graph;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.api.common.Shutdown;
import org.eclipse.ditto.base.api.common.ShutdownReason;
import org.eclipse.ditto.base.api.common.ShutdownReasonFactory;
import org.eclipse.ditto.base.api.common.purge.PurgeEntities;
import org.eclipse.ditto.base.api.common.purge.PurgeEntitiesResponse;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.entity.type.EntityType;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.namespaces.signals.commands.PurgeNamespace;
import org.eclipse.ditto.base.model.namespaces.signals.commands.PurgeNamespaceResponse;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.persistence.operations.EntityPersistenceOperations;
import org.eclipse.ditto.internal.utils.persistence.operations.NamespacePersistenceOperations;
import org.eclipse.ditto.internal.utils.persistence.operations.PersistenceOperationsConfig;
import scala.concurrent.ExecutionContext;

public abstract class AbstractPersistenceOperationsActor
extends AbstractActor {
    protected final ThreadSafeDittoLoggingAdapter logger;
    private final ActorRef pubSubMediator;
    private final EntityType entityType;
    @Nullable
    private final NamespacePersistenceOperations namespaceOps;
    @Nullable
    private final EntityPersistenceOperations entitiesOps;
    private final Materializer materializer;
    private final Collection<Closeable> toCloseWhenStopped;
    private final Duration delayAfterPersistenceActorShutdown;

    private AbstractPersistenceOperationsActor(ActorRef pubSubMediator, EntityType entityType, @Nullable NamespacePersistenceOperations namespaceOps, @Nullable EntityPersistenceOperations entitiesOps, PersistenceOperationsConfig persistenceOperationsConfig, Collection<Closeable> toCloseWhenStopped) {
        this.pubSubMediator = (ActorRef)ConditionChecker.checkNotNull((Object)pubSubMediator, (String)"pub-sub mediator");
        this.entityType = (EntityType)ConditionChecker.checkNotNull((Object)entityType, (String)"entityType");
        if (namespaceOps == null && entitiesOps == null) {
            throw new IllegalArgumentException("At least one of namespaceOps or entitiesOps must be specified.");
        }
        this.namespaceOps = namespaceOps;
        this.entitiesOps = entitiesOps;
        this.toCloseWhenStopped = List.copyOf(toCloseWhenStopped);
        this.materializer = Materializer.createMaterializer(() -> ((AbstractPersistenceOperationsActor)this).getContext());
        this.delayAfterPersistenceActorShutdown = persistenceOperationsConfig.getDelayAfterPersistenceActorShutdown();
        this.logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter((Actor)this);
    }

    protected AbstractPersistenceOperationsActor(ActorRef pubSubMediator, EntityType entityType, @Nullable NamespacePersistenceOperations namespaceOps, @Nullable EntityPersistenceOperations entitiesOps, PersistenceOperationsConfig persistenceOperationsConfig, Closeable toCloseWhenStopped, Closeable ... optionalToCloseWhenStopped) {
        this(pubSubMediator, entityType, namespaceOps, entitiesOps, persistenceOperationsConfig, AbstractPersistenceOperationsActor.toList(toCloseWhenStopped, optionalToCloseWhenStopped));
    }

    private static List<Closeable> toList(Closeable toCloseWhenStopped, Closeable ... optionalToCloseWhenStopped) {
        ConditionChecker.checkNotNull((Object)toCloseWhenStopped, (String)"Closeable");
        ConditionChecker.checkNotNull((Object)optionalToCloseWhenStopped, (String)"optional Closeables");
        ArrayList<Closeable> closeables = new ArrayList<Closeable>(1 + optionalToCloseWhenStopped.length);
        closeables.add(toCloseWhenStopped);
        Collections.addAll(closeables, optionalToCloseWhenStopped);
        return closeables;
    }

    protected AbstractPersistenceOperationsActor(ActorRef pubSubMediator, EntityType entityType, @Nullable NamespacePersistenceOperations namespaceOps, @Nullable EntityPersistenceOperations entitiesOps, PersistenceOperationsConfig persistenceOperationsConfig) {
        this(pubSubMediator, entityType, namespaceOps, entitiesOps, persistenceOperationsConfig, Collections.emptyList());
    }

    public void preStart() {
        this.subscribeForNamespaceCommands();
        this.subscribeForEntitiesCommands();
    }

    public void postStop() throws Exception {
        this.toCloseWhenStopped.forEach(closeable -> {
            try {
                closeable.close();
            }
            catch (IOException e) {
                this.logger.warning("Failed to close: <{}>!", (Object)e.getMessage());
            }
        });
        super.postStop();
    }

    private void subscribeForNamespaceCommands() {
        if (null != this.namespaceOps) {
            this.logger.debug("Subscribing for namespace commands.");
            ActorRef self = this.getSelf();
            DistributedPubSubMediator.Subscribe subscribe = DistPubSubAccess.subscribeViaGroup((String)"namespaces.commands:purgeNamespace", (String)this.getSubscribeGroup(), (ActorRef)self);
            this.pubSubMediator.tell((Object)subscribe, self);
        }
    }

    private void subscribeForEntitiesCommands() {
        if (null != this.entitiesOps) {
            ActorRef self = this.getSelf();
            String topic = PurgeEntities.getTopic((EntityType)this.entityType);
            DistributedPubSubMediator.Subscribe subscribe = DistPubSubAccess.subscribeViaGroup((String)topic, (String)this.getSubscribeGroup(), (ActorRef)self);
            this.logger.debug("Subscribing for entities commands on topic <{}>.", (Object)topic);
            this.pubSubMediator.tell((Object)subscribe, self);
        }
    }

    private String getSubscribeGroup() {
        return this.getSelf().path().toStringWithoutAddress();
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(PurgeNamespace.class, this::purgeNamespace).match(PurgeEntities.class, this::purgeEntities).match(DistributedPubSubMediator.SubscribeAck.class, this::handleSubscribeAck).matchAny(message -> this.logger.warning("unhandled: <{}>", message)).build();
    }

    private void purgeNamespace(PurgeNamespace purgeNamespace) {
        ThreadSafeDittoLoggingAdapter l = this.logger.withCorrelationId((WithDittoHeaders)purgeNamespace);
        if (null == this.namespaceOps) {
            l.warning("Cannot handle namespace command: <{}>!", (Object)purgeNamespace);
            return;
        }
        l.info("Running <{}>.", (Object)purgeNamespace);
        String namespace = purgeNamespace.getNamespace();
        ActorRef sender = this.getSender();
        ((CompletionStage)this.namespaceOps.purge(purgeNamespace.getNamespace()).runWith((Graph)Sink.head(), this.materializer)).thenAccept(errors -> {
            PurgeNamespaceResponse response;
            if (errors.isEmpty()) {
                response = PurgeNamespaceResponse.successful((CharSequence)namespace, (CharSequence)this.entityType, (DittoHeaders)purgeNamespace.getDittoHeaders());
            } else {
                errors.forEach(error -> l.error(error, "Error purging namespace <{}>!", (Object)namespace));
                response = PurgeNamespaceResponse.failed((CharSequence)namespace, (CharSequence)this.entityType, (DittoHeaders)purgeNamespace.getDittoHeaders());
            }
            sender.tell((Object)response, this.getSelf());
            l.info("Successfully purged namespace <{}>.", (Object)namespace);
        }).exceptionally(error -> {
            l.error(error, "Unexpected error when purging namespace <{}>!", (Object)purgeNamespace.getNamespace());
            return null;
        });
    }

    private void purgeEntities(PurgeEntities purgeEntities) {
        ThreadSafeDittoLoggingAdapter l = this.logger.withCorrelationId((WithDittoHeaders)purgeEntities);
        if (null == this.entitiesOps) {
            l.warning("Cannot handle entities command: <{}>.", (Object)purgeEntities);
            return;
        }
        if (!this.entityType.equals(purgeEntities.getEntityType())) {
            l.warning("Expected command with entityType <{}>, but got: <{}>.", (Object)this.entityType, (Object)purgeEntities);
            return;
        }
        this.shutDownPersistenceActorsOfEntitiesToPurge(purgeEntities);
        this.schedulePurgingEntitiesIn(this.delayAfterPersistenceActorShutdown, purgeEntities);
    }

    private void shutDownPersistenceActorsOfEntitiesToPurge(PurgeEntities purgeEntities) {
        ShutdownReason reason = ShutdownReasonFactory.getPurgeEntitiesReason((List)purgeEntities.getEntityIds());
        Shutdown shutdown = Shutdown.getInstance((ShutdownReason)reason, (DittoHeaders)purgeEntities.getDittoHeaders());
        this.pubSubMediator.tell((Object)DistPubSubAccess.publish((String)shutdown.getType(), (Object)shutdown), this.getSelf());
    }

    private void schedulePurgingEntitiesIn(Duration delay, PurgeEntities purgeEntities) {
        ActorRef initiator = this.getSender();
        this.getContext().system().scheduler().scheduleOnce(delay, () -> this.doPurgeEntities(purgeEntities, initiator), (ExecutionContext)this.getContext().dispatcher());
    }

    private void doPurgeEntities(PurgeEntities purgeEntities, ActorRef initiator) {
        ThreadSafeDittoLoggingAdapter l = this.logger.withCorrelationId((WithDittoHeaders)purgeEntities);
        if (null == this.entitiesOps) {
            l.warning("Cannot handle entities command: <{}>", (Object)purgeEntities);
            return;
        }
        l.info("Running <{}>.", (Object)purgeEntities);
        EntityType purgeEntityType = purgeEntities.getEntityType();
        List entityIds = purgeEntities.getEntityIds();
        ((CompletionStage)this.entitiesOps.purgeEntities(purgeEntities.getEntityIds()).runWith((Graph)Sink.head(), this.materializer)).thenAccept(errors -> {
            PurgeEntitiesResponse response;
            if (errors.isEmpty()) {
                response = PurgeEntitiesResponse.successful((EntityType)purgeEntityType, (DittoHeaders)purgeEntities.getDittoHeaders());
            } else {
                errors.forEach(error -> l.error(error, "Error purging entities of type <{}>: <{}>", (Object)purgeEntityType, (Object)entityIds));
                response = PurgeEntitiesResponse.failed((EntityType)purgeEntityType, (DittoHeaders)purgeEntities.getDittoHeaders());
            }
            initiator.tell((Object)response, this.getSelf());
            l.info("Successfully purged entities of type <{}>: <{}>", (Object)purgeEntityType, (Object)entityIds);
        }).exceptionally(error -> {
            l.error(error, "Unexpected error when purging entities <{}>!", (Object)purgeEntities.getEntityIds());
            return null;
        });
    }

    private void handleSubscribeAck(DistributedPubSubMediator.SubscribeAck subscribeAck) {
        this.logger.debug("Got subscribeAck <{}>.", (Object)subscribeAck);
    }
}

