/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.instance;

import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import lombok.Generated;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.ContextImpl;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.JavaExecutionResult;
import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JavaInstance
implements AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(JavaInstance.class);
    private final ContextImpl context;
    private Function function;
    private java.util.function.Function javaUtilFunction;
    private final InstanceConfig instanceConfig;
    private final ExecutorService executor;
    private final LinkedBlockingQueue<AsyncFuncRequest> pendingAsyncRequests;
    private final Semaphore asyncRequestsConcurrencyLimiter;
    private final boolean asyncPreserveInputOrderForOutputMessages;

    public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceConfig instanceConfig) {
        this.context = contextImpl;
        this.instanceConfig = instanceConfig;
        this.executor = Executors.newSingleThreadExecutor();
        this.asyncPreserveInputOrderForOutputMessages = this.resolveAsyncPreserveInputOrderForOutputMessages(instanceConfig);
        if (this.asyncPreserveInputOrderForOutputMessages) {
            this.pendingAsyncRequests = new LinkedBlockingQueue(this.instanceConfig.getMaxPendingAsyncRequests());
            this.asyncRequestsConcurrencyLimiter = null;
        } else {
            this.pendingAsyncRequests = null;
            this.asyncRequestsConcurrencyLimiter = new Semaphore(this.instanceConfig.getMaxPendingAsyncRequests());
        }
        if (userClassObject instanceof Function) {
            this.function = (Function)userClassObject;
        } else {
            this.javaUtilFunction = (java.util.function.Function)userClassObject;
        }
    }

    private boolean resolveAsyncPreserveInputOrderForOutputMessages(InstanceConfig instanceConfig) {
        boolean voidReturnType;
        boolean bl = voidReturnType = instanceConfig.getFunctionDetails() != null && instanceConfig.getFunctionDetails().getSink() != null && Void.class.getName().equals(instanceConfig.getFunctionDetails().getSink().getTypeClassName());
        return !voidReturnType;
    }

    @VisibleForTesting
    public JavaExecutionResult handleMessage(Record<?> record, Object input) {
        return this.handleMessage(record, input, (rec, result) -> {}, cause -> {});
    }

    public JavaExecutionResult handleMessage(Record<?> record, Object input, JavaInstanceRunnable.AsyncResultConsumer asyncResultConsumer, Consumer<Throwable> asyncFailureHandler) {
        Object output;
        if (this.context != null) {
            this.context.setCurrentMessageContext(record);
        }
        JavaExecutionResult executionResult = new JavaExecutionResult();
        try {
            output = this.function != null ? this.function.process(input, (Context)this.context) : this.javaUtilFunction.apply(input);
        }
        catch (Exception ex) {
            executionResult.setUserException(ex);
            return executionResult;
        }
        if (output instanceof CompletableFuture) {
            try {
                if (this.asyncPreserveInputOrderForOutputMessages) {
                    AsyncFuncRequest request = new AsyncFuncRequest(record, (CompletableFuture)output);
                    this.pendingAsyncRequests.put(request);
                } else {
                    this.asyncRequestsConcurrencyLimiter.acquire();
                }
                ((CompletableFuture)output).whenCompleteAsync((res, cause) -> {
                    block8: {
                        try {
                            if (this.asyncPreserveInputOrderForOutputMessages) {
                                this.processAsyncResultsInInputOrder(asyncResultConsumer);
                                break block8;
                            }
                            try {
                                JavaExecutionResult execResult = new JavaExecutionResult();
                                if (cause != null) {
                                    execResult.setUserException(FutureUtil.unwrapCompletionException((Throwable)cause));
                                } else {
                                    execResult.setResult(res);
                                }
                                asyncResultConsumer.accept(record, execResult);
                            }
                            finally {
                                this.asyncRequestsConcurrencyLimiter.release();
                            }
                        }
                        catch (Throwable innerException) {
                            asyncFailureHandler.accept(innerException);
                        }
                    }
                }, (Executor)this.executor);
                return null;
            }
            catch (InterruptedException ie) {
                log.warn("Exception while put Async requests", (Throwable)ie);
                executionResult.setUserException(ie);
                return executionResult;
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("Got result: object: {}", output);
        }
        executionResult.setResult(output);
        return executionResult;
    }

    private void processAsyncResultsInInputOrder(JavaInstanceRunnable.AsyncResultConsumer resultConsumer) throws Exception {
        AsyncFuncRequest asyncResult = this.pendingAsyncRequests.peek();
        while (asyncResult != null && asyncResult.getProcessResult().isDone()) {
            this.pendingAsyncRequests.remove(asyncResult);
            JavaExecutionResult execResult = new JavaExecutionResult();
            try {
                Object result = asyncResult.getProcessResult().get();
                execResult.setResult(result);
            }
            catch (ExecutionException e) {
                execResult.setUserException(FutureUtil.unwrapCompletionException((Throwable)e));
            }
            resultConsumer.accept(asyncResult.getRecord(), execResult);
            asyncResult = this.pendingAsyncRequests.peek();
        }
    }

    public void initialize() throws Exception {
        if (this.function != null) {
            this.function.initialize((Context)this.context);
        }
    }

    @Override
    public void close() {
        if (this.function != null) {
            try {
                this.function.close();
            }
            catch (Exception e) {
                log.error("function closeResource occurred exception", (Throwable)e);
            }
        }
        this.context.close();
        this.executor.shutdown();
    }

    public Map<String, Double> getAndResetMetrics() {
        return this.context.getAndResetMetrics();
    }

    public void resetMetrics() {
        this.context.resetMetrics();
    }

    public Map<String, Double> getMetrics() {
        return this.context.getMetrics();
    }

    @Generated
    ContextImpl getContext() {
        return this.context;
    }

    @Generated
    public LinkedBlockingQueue<AsyncFuncRequest> getPendingAsyncRequests() {
        return this.pendingAsyncRequests;
    }

    @Generated
    public Semaphore getAsyncRequestsConcurrencyLimiter() {
        return this.asyncRequestsConcurrencyLimiter;
    }

    public static class AsyncFuncRequest {
        private final Record record;
        private final CompletableFuture processResult;

        @Generated
        public AsyncFuncRequest(Record record, CompletableFuture processResult) {
            this.record = record;
            this.processResult = processResult;
        }

        @Generated
        public Record getRecord() {
            return this.record;
        }

        @Generated
        public CompletableFuture getProcessResult() {
            return this.processResult;
        }

        @Generated
        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof AsyncFuncRequest)) {
                return false;
            }
            AsyncFuncRequest other = (AsyncFuncRequest)o;
            if (!other.canEqual(this)) {
                return false;
            }
            Record this$record = this.getRecord();
            Record other$record = other.getRecord();
            if (this$record == null ? other$record != null : !this$record.equals(other$record)) {
                return false;
            }
            CompletableFuture this$processResult = this.getProcessResult();
            CompletableFuture other$processResult = other.getProcessResult();
            return !(this$processResult == null ? other$processResult != null : !this$processResult.equals(other$processResult));
        }

        @Generated
        protected boolean canEqual(Object other) {
            return other instanceof AsyncFuncRequest;
        }

        @Generated
        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Record $record = this.getRecord();
            result = result * 59 + ($record == null ? 43 : $record.hashCode());
            CompletableFuture $processResult = this.getProcessResult();
            result = result * 59 + ($processResult == null ? 43 : $processResult.hashCode());
            return result;
        }

        @Generated
        public String toString() {
            return "JavaInstance.AsyncFuncRequest(record=" + this.getRecord() + ", processResult=" + this.getProcessResult() + ")";
        }
    }
}

