package org.dei.perla.core.fpc.base;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.function.Consumer;
import org.dei.perla.core.engine.Executor;
import org.dei.perla.core.engine.Script;
import org.dei.perla.core.engine.ScriptHandler;
import org.dei.perla.core.engine.ScriptParameter;
import org.dei.perla.core.fpc.Attribute;
import org.dei.perla.core.fpc.TaskHandler;
import org.dei.perla.core.message.FpcMessage;
import org.dei.perla.core.message.Mapper;
import org.dei.perla.core.utils.AsyncUtils;

/* loaded from: input_file:org/dei/perla/core/fpc/base/AsyncOperation.class */
public final class AsyncOperation extends BaseOperation<AsyncTask> {
    private static final int STOPPED = 0;
    private static final int SUSPENDED = 1;
    private static final int STARTED = 2;
    private static final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);
    private final Script startScript;
    private int state;
    private final AsyncMessageHandler asyncHandler;
    private final OnHandler onHandler;
    private volatile Object[] sample;

    /* loaded from: input_file:org/dei/perla/core/fpc/base/AsyncOperation$AsyncMessageHandler.class */
    public static class AsyncMessageHandler {
        private final Mapper mapper;
        private final Script script;
        private final String variable;

        public AsyncMessageHandler(Mapper mapper, Script script, String str) {
            this.mapper = mapper;
            this.script = script;
            this.variable = str;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/dei/perla/core/fpc/base/AsyncOperation$AsyncTask.class */
    public class AsyncTask extends BaseTask {
        public AsyncTask(BaseOperation<?> baseOperation, TaskHandler taskHandler, SamplePipeline samplePipeline) {
            super(baseOperation, taskHandler, samplePipeline);
        }
    }

    /* loaded from: input_file:org/dei/perla/core/fpc/base/AsyncOperation$OnHandler.class */
    private class OnHandler implements ScriptHandler {
        private OnHandler() {
        }

        @Override // org.dei.perla.core.engine.ScriptHandler
        public void complete(Script script, List<Object[]> list) {
            synchronized (AsyncOperation.this) {
                list.forEach(objArr -> {
                    AsyncOperation.this.forEachTask(asyncTask -> {
                        asyncTask.processSample(objArr);
                    });
                });
                int size = list.size() - AsyncOperation.SUSPENDED;
                AsyncOperation.this.sample = list.get(size);
            }
        }

        @Override // org.dei.perla.core.engine.ScriptHandler
        public void error(Script script, Throwable th) {
            synchronized (AsyncOperation.this) {
                AsyncOperation.this.log.error("Execution error in 'on' script", th);
                AsyncOperation.this.forEachTask(asyncTask -> {
                    asyncTask.notifyError(th, false);
                });
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/dei/perla/core/fpc/base/AsyncOperation$StartHandler.class */
    public class StartHandler implements ScriptHandler {
        private StartHandler() {
        }

        @Override // org.dei.perla.core.engine.ScriptHandler
        public void complete(Script script, List<Object[]> list) {
            synchronized (AsyncOperation.this) {
                AsyncOperation.this.state = AsyncOperation.STARTED;
            }
        }

        @Override // org.dei.perla.core.engine.ScriptHandler
        public void error(Script script, Throwable th) {
            synchronized (AsyncOperation.this) {
                if (AsyncOperation.this.state == AsyncOperation.SUSPENDED) {
                    return;
                }
                AsyncOperation.this.state = AsyncOperation.SUSPENDED;
                AsyncOperation.this.log.error("Error starting asynchronous operation", th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AsyncOperation(String str, List<Attribute> list, Script script, AsyncMessageHandler asyncMessageHandler, ChannelManager channelManager) {
        super(str, list);
        this.onHandler = new OnHandler();
        this.startScript = script;
        this.asyncHandler = asyncMessageHandler;
        this.sample = new Object[list.size()];
        this.state = STOPPED;
        channelManager.addCallback(this.asyncHandler.mapper, this::handleMessage);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void start() {
        runStartScript();
    }

    private void runStartScript() {
        if (this.startScript != null) {
            Executor.execute(this.startScript, new StartHandler());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized Object[] getSampleCopy() {
        return Arrays.copyOf(this.sample, this.sample.length);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.dei.perla.core.fpc.base.BaseOperation
    public AsyncTask doSchedule(Map<String, Object> map, TaskHandler taskHandler, SamplePipeline samplePipeline) throws IllegalArgumentException {
        AsyncTask asyncTask = new AsyncTask(this, taskHandler, samplePipeline);
        add(asyncTask);
        return asyncTask;
    }

    public void handleMessage(FpcMessage fpcMessage) {
        Executor.execute(this.asyncHandler.script, new ScriptParameter[]{new ScriptParameter(this.asyncHandler.variable, fpcMessage)}, this.onHandler);
    }

    @Override // org.dei.perla.core.fpc.base.BaseOperation
    public void doStop() {
        this.state = STOPPED;
    }

    @Override // org.dei.perla.core.fpc.base.BaseOperation
    public void doStop(Consumer<Operation> consumer) {
        doStop();
        AsyncUtils.runInNewThread(() -> {
            consumer.accept(this);
        });
    }

    @Override // org.dei.perla.core.fpc.base.BaseOperation
    public /* bridge */ /* synthetic */ AsyncTask doSchedule(Map map, TaskHandler taskHandler, SamplePipeline samplePipeline) throws IllegalArgumentException {
        return doSchedule((Map<String, Object>) map, taskHandler, samplePipeline);
    }

    static {
        executor.setRemoveOnCancelPolicy(true);
    }
}
