/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.broker.task;

import io.zeebe.broker.logstreams.LogStreamServiceNames;
import io.zeebe.broker.logstreams.processor.StreamProcessorService;
import io.zeebe.broker.system.SystemServiceNames;
import io.zeebe.broker.system.executor.ScheduledCommand;
import io.zeebe.broker.system.executor.ScheduledExecutor;
import io.zeebe.broker.task.TaskQueueManager;
import io.zeebe.broker.task.TaskQueueServiceNames;
import io.zeebe.broker.task.TaskSubscriptionManager;
import io.zeebe.broker.task.processor.TaskExpireLockStreamProcessor;
import io.zeebe.broker.task.processor.TaskInstanceStreamProcessor;
import io.zeebe.broker.transport.clientapi.CommandResponseWriter;
import io.zeebe.broker.transport.clientapi.SubscribedEventWriter;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.processor.StreamProcessorController;
import io.zeebe.servicecontainer.Injector;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceGroupReference;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.servicecontainer.ServiceStopContext;
import io.zeebe.transport.ServerTransport;
import io.zeebe.util.DeferredCommandContext;
import io.zeebe.util.actor.Actor;
import io.zeebe.util.actor.ActorReference;
import io.zeebe.util.actor.ActorScheduler;
import java.time.Duration;

public class TaskQueueManagerService
implements Service<TaskQueueManager>,
TaskQueueManager,
Actor {
    protected static final String NAME = "task.queue.manager";
    public static final int LOCK_EXPIRATION_INTERVAL = 30;
    protected final Injector<ServerTransport> clientApiTransportInjector = new Injector();
    protected final Injector<ScheduledExecutor> executorInjector = new Injector();
    protected final Injector<TaskSubscriptionManager> taskSubscriptionManagerInjector = new Injector();
    protected final Injector<ActorScheduler> actorSchedulerInjector = new Injector();
    protected final ServiceGroupReference<LogStream> logStreamsGroupReference = ServiceGroupReference.create().onAdd((name, stream) -> this.addStream((LogStream)stream, (ServiceName<LogStream>)name)).build();
    protected ServiceStartContext serviceContext;
    protected DeferredCommandContext asyncContext;
    protected ActorReference actorRef;
    protected ScheduledCommand scheduledCheckExpirationCmd;

    @Override
    public void startTaskQueue(String logName) {
        if (logName == null || logName.isEmpty()) {
            throw new RuntimeException("Cannot start task queue: Mandatory configuration property 'logName' is not set.");
        }
        ServiceName<StreamProcessorController> streamProcessorServiceName = TaskQueueServiceNames.taskQueueInstanceStreamProcessorServiceName(logName);
        String streamProcessorName = streamProcessorServiceName.getName();
        ServerTransport serverTransport = (ServerTransport)this.clientApiTransportInjector.getValue();
        CommandResponseWriter responseWriter = new CommandResponseWriter(serverTransport.getOutput());
        SubscribedEventWriter subscribedEventWriter = new SubscribedEventWriter(serverTransport.getOutput());
        ServiceName<LogStream> logStreamServiceName = LogStreamServiceNames.logStreamServiceName(logName);
        TaskSubscriptionManager taskSubscriptionManager = (TaskSubscriptionManager)this.taskSubscriptionManagerInjector.getValue();
        TaskInstanceStreamProcessor taskInstanceStreamProcessor = new TaskInstanceStreamProcessor(responseWriter, subscribedEventWriter, taskSubscriptionManager);
        StreamProcessorService taskInstanceStreamProcessorService = new StreamProcessorService(streamProcessorName, 10, taskInstanceStreamProcessor).eventFilter(TaskInstanceStreamProcessor.eventFilter());
        this.serviceContext.createService(streamProcessorServiceName, (Service)taskInstanceStreamProcessorService).group(TaskQueueServiceNames.TASK_QUEUE_STREAM_PROCESSOR_SERVICE_GROUP_NAME).dependency(logStreamServiceName, taskInstanceStreamProcessorService.getSourceStreamInjector()).dependency(logStreamServiceName, taskInstanceStreamProcessorService.getTargetStreamInjector()).dependency(LogStreamServiceNames.SNAPSHOT_STORAGE_SERVICE, taskInstanceStreamProcessorService.getSnapshotStorageInjector()).dependency(SystemServiceNames.ACTOR_SCHEDULER_SERVICE, taskInstanceStreamProcessorService.getActorSchedulerInjector()).install();
        this.startExpireLockService(logName, logStreamServiceName);
    }

    protected void startExpireLockService(String logStreamName, ServiceName<LogStream> logStreamServiceName) {
        ScheduledExecutor executor = (ScheduledExecutor)this.executorInjector.getValue();
        ServiceName<StreamProcessorController> expireLockStreamProcessorServiceName = TaskQueueServiceNames.taskQueueExpireLockStreamProcessorServiceName(logStreamName);
        TaskExpireLockStreamProcessor expireLockStreamProcessor = new TaskExpireLockStreamProcessor();
        StreamProcessorService expireLockStreamProcessorService = new StreamProcessorService(expireLockStreamProcessorServiceName.getName(), 30, expireLockStreamProcessor).eventFilter(TaskExpireLockStreamProcessor.eventFilter());
        this.serviceContext.createService(expireLockStreamProcessorServiceName, (Service)expireLockStreamProcessorService).dependency(logStreamServiceName, expireLockStreamProcessorService.getSourceStreamInjector()).dependency(logStreamServiceName, expireLockStreamProcessorService.getTargetStreamInjector()).dependency(LogStreamServiceNames.SNAPSHOT_STORAGE_SERVICE, expireLockStreamProcessorService.getSnapshotStorageInjector()).dependency(SystemServiceNames.ACTOR_SCHEDULER_SERVICE, expireLockStreamProcessorService.getActorSchedulerInjector()).install().thenRun(() -> {
            this.scheduledCheckExpirationCmd = executor.scheduleAtFixedRate(expireLockStreamProcessor::checkLockExpirationAsync, Duration.ofSeconds(30L));
        });
    }

    public void start(ServiceStartContext serviceContext) {
        this.serviceContext = serviceContext;
        this.asyncContext = new DeferredCommandContext();
        ActorScheduler actorScheduler = (ActorScheduler)this.actorSchedulerInjector.getValue();
        this.actorRef = actorScheduler.schedule((Actor)this);
    }

    public void stop(ServiceStopContext ctx) {
        ctx.run(() -> {
            if (this.scheduledCheckExpirationCmd != null) {
                this.scheduledCheckExpirationCmd.cancel();
            }
            this.actorRef.close();
        });
    }

    public TaskQueueManager get() {
        return this;
    }

    public Injector<ServerTransport> getClientApiTransportInjector() {
        return this.clientApiTransportInjector;
    }

    public Injector<ScheduledExecutor> getExecutorInjector() {
        return this.executorInjector;
    }

    public Injector<TaskSubscriptionManager> getTaskSubscriptionManagerInjector() {
        return this.taskSubscriptionManagerInjector;
    }

    public ServiceGroupReference<LogStream> getLogStreamsGroupReference() {
        return this.logStreamsGroupReference;
    }

    public Injector<ActorScheduler> getActorSchedulerInjector() {
        return this.actorSchedulerInjector;
    }

    public void addStream(LogStream logStream, ServiceName<LogStream> logStreamServiceName) {
        this.asyncContext.runAsync(r -> this.startTaskQueue(logStream.getLogName()));
    }

    public int doWork() throws Exception {
        return this.asyncContext.doWork();
    }

    public String name() {
        return NAME;
    }
}

