package com.insyde.flink.statefun;

import com.insyde.flink.statefun.dispatcher.MessageDispatcher;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.statefun.sdk.java.Context;
import org.apache.flink.statefun.sdk.java.StatefulFunction;
import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
import org.apache.flink.statefun.sdk.java.TypeName;
import org.apache.flink.statefun.sdk.java.ValueSpec;
import org.apache.flink.statefun.sdk.java.message.Message;

/* loaded from: input_file:com/insyde/flink/statefun/DispatchableFunctionWrapper.class */
public class DispatchableFunctionWrapper implements FunctionWrapper {
    private final StatefulFunction wrappedFunction;
    private final MessageDispatcher dispatcher;

    @Override // com.insyde.flink.statefun.FunctionWrapper, com.insyde.flink.statefun.api.DispatchableFunction
    public CompletableFuture<Void> apply(Context context, Message message) {
        return (CompletableFuture) this.dispatcher.dispatch(context, message, this.wrappedFunction).map((v0) -> {
            return v0.getReturnValue();
        }).orElseThrow();
    }

    @Override // com.insyde.flink.statefun.StatefulFunctionSpecFactory
    public StatefulFunctionSpec createSpec() {
        Map map = (Map) Arrays.stream(this.wrappedFunction.getClass().getDeclaredFields()).filter(field -> {
            return field.getType().isAssignableFrom(ValueSpec.class);
        }).map(this::getValue).collect(Collectors.groupingBy((v0) -> {
            return v0.getClass();
        }));
        TypeName typeName = TypeNameUtil.typeName(this.wrappedFunction.getClass());
        Stream stream = ((List) map.getOrDefault(ValueSpec.class, List.of())).stream();
        Class<ValueSpec> cls = ValueSpec.class;
        Objects.requireNonNull(ValueSpec.class);
        return StatefulFunctionSpec.builder(typeName).withValueSpecs((ValueSpec[]) stream.map(cls::cast).toArray(i -> {
            return new ValueSpec[i];
        })).withSupplier(() -> {
            return this;
        }).build();
    }

    private Object getValue(Field field) {
        return field.get(this.wrappedFunction);
    }

    public DispatchableFunctionWrapper(StatefulFunction statefulFunction, MessageDispatcher messageDispatcher) {
        this.wrappedFunction = statefulFunction;
        this.dispatcher = messageDispatcher;
    }

    @Override // com.insyde.flink.statefun.FunctionWrapper
    public StatefulFunction getWrappedFunction() {
        return this.wrappedFunction;
    }
}
