package com.insyde.flink.statefun;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.statefun.sdk.java.Address;
import org.apache.flink.statefun.sdk.java.Context;
import org.apache.flink.statefun.sdk.java.StatefulFunction;
import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
import org.apache.flink.statefun.sdk.java.TypeName;
import org.apache.flink.statefun.sdk.java.ValueSpec;
import org.apache.flink.statefun.sdk.java.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:com/insyde/flink/statefun/ExceptionHandlingFunctionWrapper.class */
public class ExceptionHandlingFunctionWrapper implements FunctionWrapper {
    private static final Logger log = LoggerFactory.getLogger(ExceptionHandlingFunctionWrapper.class);
    private static final String FUNCTION_ID_LOG_MARKER = "function_id";
    private static final String MESSAGE_ID = "message_id";
    private final FunctionWrapper functionWrapper;

    @Override // com.insyde.flink.statefun.FunctionWrapper, com.insyde.flink.statefun.api.DispatchableFunction
    public CompletableFuture<Void> apply(Context context, Message message) {
        String uuid = UUID.randomUUID().toString();
        MDC.put(FUNCTION_ID_LOG_MARKER, context.self().id());
        MDC.put(MESSAGE_ID, uuid);
        log.info("Incoming message {} for function {} received", message.valueTypeName(), context.self());
        try {
            try {
                CompletableFuture<Void> apply = this.functionWrapper.apply(context, message);
                MDC.remove(FUNCTION_ID_LOG_MARKER);
                return apply;
            } catch (Exception e) {
                Address self = context.self();
                TypeName type = self.type();
                log.error("Failed to execute function {}.{}:{}", new Object[]{type.namespace(), type.name(), self.id(), e});
                MDC.remove(FUNCTION_ID_LOG_MARKER);
                log.info("Message handled");
                return context.done();
            }
        } catch (Throwable th) {
            MDC.remove(FUNCTION_ID_LOG_MARKER);
            throw th;
        }
    }

    @Override // com.insyde.flink.statefun.StatefulFunctionSpecFactory
    public StatefulFunctionSpec createSpec() {
        StatefulFunctionSpec createSpec = this.functionWrapper.createSpec();
        return StatefulFunctionSpec.builder(createSpec.typeName()).withSupplier(() -> {
            return this;
        }).withValueSpecs((ValueSpec[]) createSpec.knownValues().values().toArray(i -> {
            return new ValueSpec[i];
        })).build();
    }

    @Override // com.insyde.flink.statefun.FunctionWrapper
    public StatefulFunction getWrappedFunction() {
        return this.functionWrapper.getWrappedFunction();
    }

    public ExceptionHandlingFunctionWrapper(FunctionWrapper functionWrapper) {
        this.functionWrapper = functionWrapper;
    }
}
