package com.fnproject.fn.runtime.flow;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fnproject.fn.api.FunctionInvoker;
import com.fnproject.fn.api.Headers;
import com.fnproject.fn.api.InputEvent;
import com.fnproject.fn.api.InvocationContext;
import com.fnproject.fn.api.InvocationListener;
import com.fnproject.fn.api.OutputEvent;
import com.fnproject.fn.api.exception.FunctionInputHandlingException;
import com.fnproject.fn.api.flow.Flow;
import com.fnproject.fn.api.flow.Flows;
import com.fnproject.fn.api.flow.PlatformException;
import com.fnproject.fn.runtime.exception.InternalFunctionInvocationException;
import com.fnproject.fn.runtime.exception.PlatformCommunicationException;
import com.fnproject.fn.runtime.flow.APIModel;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/* loaded from: input_file:com/fnproject/fn/runtime/flow/FlowContinuationInvoker.class */
public final class FlowContinuationInvoker implements FunctionInvoker {
    private static final String DEFAULT_COMPLETER_BASE_URL = "http://completer-svc:8081";
    private static final String COMPLETER_BASE_URL = "COMPLETER_BASE_URL";
    public static final String FLOW_ID_HEADER = "Fnproject-FlowId";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/fnproject/fn/runtime/flow/FlowContinuationInvoker$ContinuationOutputEvent.class */
    public static final class ContinuationOutputEvent implements OutputEvent {
        private final byte[] body;
        private static final Headers headers = Headers.emptyHeaders().setHeader("Content-Type", "application/json", new String[0]);

        private ContinuationOutputEvent(boolean z, byte[] bArr) {
            this.body = bArr;
        }

        public OutputEvent.Status getStatus() {
            return OutputEvent.Status.Success;
        }

        public void writeToOutput(OutputStream outputStream) throws IOException {
            outputStream.write(this.body);
        }

        public Headers getHeaders() {
            return headers;
        }
    }

    /* loaded from: input_file:com/fnproject/fn/runtime/flow/FlowContinuationInvoker$DispatchPattern.class */
    private interface DispatchPattern {
        boolean matches(Object obj);

        int numArguments();

        Method getInvokeMethod(Object obj);
    }

    /* loaded from: input_file:com/fnproject/fn/runtime/flow/FlowContinuationInvoker$Dispatchers.class */
    private enum Dispatchers implements DispatchPattern {
        CallableDispatch(Callable.class, 0, "call"),
        FunctionDispatch(Function.class, 1, "apply"),
        BiFunctionDispatch(BiFunction.class, 2, "apply"),
        RunnableDispatch(Runnable.class, 0, "run"),
        ConsumerDispatch(Consumer.class, 1, "accept"),
        BiConsumerDispatch(BiConsumer.class, 2, "accept"),
        SupplierDispatch(Supplier.class, 0, "get");

        private final Class<?> matchType;
        private final int numArguments;
        private final String methodName;

        @Override // com.fnproject.fn.runtime.flow.FlowContinuationInvoker.DispatchPattern
        public boolean matches(Object obj) {
            return this.matchType.isInstance(obj);
        }

        @Override // com.fnproject.fn.runtime.flow.FlowContinuationInvoker.DispatchPattern
        public int numArguments() {
            return this.numArguments;
        }

        @Override // com.fnproject.fn.runtime.flow.FlowContinuationInvoker.DispatchPattern
        public Method getInvokeMethod(Object obj) {
            try {
                Class<?>[] clsArr = new Class[this.numArguments];
                for (int i = 0; i < clsArr.length; i++) {
                    clsArr[i] = Object.class;
                }
                return obj.getClass().getMethod(this.methodName, clsArr);
            } catch (Exception e) {
                throw new IllegalStateException("Unable to find method " + this.methodName + " on " + obj.getClass());
            }
        }

        Dispatchers(Class cls, int i, String str) {
            this.matchType = cls;
            this.numArguments = i;
            this.methodName = str;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/fnproject/fn/runtime/flow/FlowContinuationInvoker$URLCompleterClientFactory.class */
    public static class URLCompleterClientFactory implements CompleterClientFactory {
        private final String completerBaseUrl;
        private transient CompleterClient completerClient;
        private transient BlobStoreClient blobClient;

        URLCompleterClientFactory(String str) {
            this.completerBaseUrl = str;
        }

        @Override // com.fnproject.fn.runtime.flow.CompleterClientFactory
        public synchronized CompleterClient getCompleterClient() {
            if (this.completerClient == null) {
                this.completerClient = new RemoteFlowApiClient(this.completerBaseUrl + "/v1", getBlobStoreClient(), new HttpClient());
            }
            return this.completerClient;
        }

        @Override // com.fnproject.fn.runtime.flow.CompleterClientFactory
        public synchronized BlobStoreClient getBlobStoreClient() {
            if (this.blobClient == null) {
                this.blobClient = new RemoteBlobStoreClient(this.completerBaseUrl + "/blobs", new HttpClient());
            }
            return this.blobClient;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static synchronized CompleterClientFactory getOrCreateCompleterClientFactory(String str) {
        if (FlowRuntimeGlobals.getCompleterClientFactory() == null) {
            FlowRuntimeGlobals.setCompleterClientFactory(new URLCompleterClientFactory(str));
        }
        return FlowRuntimeGlobals.getCompleterClientFactory();
    }

    public Optional<OutputEvent> tryInvoke(final InvocationContext invocationContext, InputEvent inputEvent) {
        Optional optional = inputEvent.getHeaders().get(FLOW_ID_HEADER);
        final String str = (String) invocationContext.getRuntimeContext().getConfigurationByKey(COMPLETER_BASE_URL).orElse(DEFAULT_COMPLETER_BASE_URL);
        if (!optional.isPresent()) {
            Flows.setCurrentFlowSource(new Flows.FlowSource() { // from class: com.fnproject.fn.runtime.flow.FlowContinuationInvoker.2
                Flow runtime;

                public synchronized Flow currentFlow() {
                    if (this.runtime == null) {
                        String functionID = invocationContext.getRuntimeContext().getFunctionID();
                        final CompleterClientFactory orCreateCompleterClientFactory = FlowContinuationInvoker.getOrCreateCompleterClientFactory(str);
                        final FlowId createFlow = orCreateCompleterClientFactory.getCompleterClient().createFlow(functionID);
                        this.runtime = new RemoteFlow(createFlow);
                        invocationContext.addListener(new InvocationListener() { // from class: com.fnproject.fn.runtime.flow.FlowContinuationInvoker.2.1
                            public void onSuccess() {
                                orCreateCompleterClientFactory.getCompleterClient().commit(createFlow);
                            }

                            public void onFailure() {
                                orCreateCompleterClientFactory.getCompleterClient().commit(createFlow);
                            }
                        });
                    }
                    return this.runtime;
                }
            });
            return Optional.empty();
        }
        CompleterClientFactory orCreateCompleterClientFactory = getOrCreateCompleterClientFactory(str);
        final FlowId flowId = new FlowId((String) optional.get());
        Flows.setCurrentFlowSource(new Flows.FlowSource() { // from class: com.fnproject.fn.runtime.flow.FlowContinuationInvoker.1
            Flow runtime;

            public synchronized Flow currentFlow() {
                if (this.runtime == null) {
                    this.runtime = new RemoteFlow(flowId);
                }
                return this.runtime;
            }
        });
        try {
            Optional<OutputEvent> optional2 = (Optional) inputEvent.consumeBody(inputStream -> {
                try {
                    APIModel.InvokeStageRequest invokeStageRequest = (APIModel.InvokeStageRequest) FlowRuntimeGlobals.getObjectMapper().readValue(inputStream, APIModel.InvokeStageRequest.class);
                    new HttpClient();
                    BlobStoreClient blobStoreClient = orCreateCompleterClientFactory.getBlobStoreClient();
                    FlowRuntimeGlobals.setCurrentCompletionId(new CompletionId(invokeStageRequest.stageId));
                    if (!invokeStageRequest.closure.contentType.equals(RemoteFlowApiClient.CONTENT_TYPE_JAVA_OBJECT)) {
                        throw new FunctionInputHandlingException("Content type of closure isn't a Java serialized object");
                    }
                    Object readBlob = blobStoreClient.readBlob(flowId.getId(), invokeStageRequest.closure.blobId, inputStream -> {
                        try {
                            ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
                            Throwable th = null;
                            try {
                                Object readObject = objectInputStream.readObject();
                                if (0 != 0) {
                                    try {
                                        objectInputStream.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    objectInputStream.close();
                                }
                                return readObject;
                            } catch (Throwable th3) {
                                if (0 != 0) {
                                    try {
                                        objectInputStream.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    objectInputStream.close();
                                }
                                throw th3;
                            }
                        } catch (IOException | ClassNotFoundException e) {
                            throw new FunctionInputHandlingException("Error reading continuation content", e);
                        }
                    }, invokeStageRequest.closure.contentType);
                    Dispatchers dispatchers = null;
                    Dispatchers[] values = Dispatchers.values();
                    int length = values.length;
                    int i = 0;
                    while (true) {
                        if (i >= length) {
                            break;
                        }
                        Dispatchers dispatchers2 = values[i];
                        if (dispatchers2.matches(readBlob)) {
                            dispatchers = dispatchers2;
                            break;
                        }
                        i++;
                    }
                    if (dispatchers == null) {
                        throw new FunctionInputHandlingException("No functional interface type matches the supplied continuation class");
                    }
                    if (dispatchers.numArguments() != invokeStageRequest.args.size()) {
                        throw new FunctionInputHandlingException("Number of arguments provided (" + invokeStageRequest.args.size() + ") in .InvokeStageRequest does not match the number required by the function type (" + dispatchers.numArguments() + ")");
                    }
                    return Optional.of(invokeContinuation(blobStoreClient, flowId, readBlob, dispatchers.getInvokeMethod(readBlob), invokeStageRequest.args.stream().map(completionResult -> {
                        return completionResult.toJava(flowId, blobStoreClient, getClass().getClassLoader());
                    }).toArray()));
                } catch (IOException e) {
                    throw new PlatformCommunicationException("Error reading continuation content", e);
                }
            });
            Flows.setCurrentFlowSource((Flows.FlowSource) null);
            FlowRuntimeGlobals.setCurrentCompletionId(null);
            return optional2;
        } catch (Throwable th) {
            Flows.setCurrentFlowSource((Flows.FlowSource) null);
            FlowRuntimeGlobals.setCurrentCompletionId(null);
            throw th;
        }
    }

    private OutputEvent invokeContinuation(BlobStoreClient blobStoreClient, FlowId flowId, Object obj, Method method, Object[] objArr) {
        try {
            method.setAccessible(true);
            return constructOutputEvent(APIModel.datumFromJava(flowId, method.invoke(obj, objArr), blobStoreClient), true);
        } catch (InvocationTargetException e) {
            throw new InternalFunctionInvocationException("Error invoking flows lambda", e.getCause(), constructOutputEvent(APIModel.datumFromJava(flowId, e.getCause(), blobStoreClient), false));
        } catch (Exception e2) {
            throw new PlatformException(e2);
        }
    }

    private OutputEvent constructOutputEvent(APIModel.Datum datum, boolean z) {
        APIModel.CompletionResult completionResult = new APIModel.CompletionResult();
        completionResult.result = datum;
        completionResult.successful = z;
        APIModel.InvokeStageResponse invokeStageResponse = new APIModel.InvokeStageResponse();
        invokeStageResponse.result = completionResult;
        try {
            return new ContinuationOutputEvent(z, FlowRuntimeGlobals.getObjectMapper().writeValueAsString(invokeStageResponse).getBytes());
        } catch (JsonProcessingException e) {
            throw new PlatformException("Error writing JSON", e);
        }
    }
}
