/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.instance;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.StateStore;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.instance.AbstractSinkRecord;
import org.apache.pulsar.functions.instance.ContextImpl;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.instance.JavaExecutionResult;
import org.apache.pulsar.functions.instance.JavaInstance;
import org.apache.pulsar.functions.instance.LogAppender;
import org.apache.pulsar.functions.instance.OutputRecordSinkRecord;
import org.apache.pulsar.functions.instance.ProducerCache;
import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.functions.instance.SinkSchemaInfoProvider;
import org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl;
import org.apache.pulsar.functions.instance.state.InstanceStateManager;
import org.apache.pulsar.functions.instance.state.StateManager;
import org.apache.pulsar.functions.instance.state.StateStoreContextImpl;
import org.apache.pulsar.functions.instance.state.StateStoreProvider;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.functions.runtime.shaded.com.scurrilous.circe.checksum.Crc32cIntChecksum;
import org.apache.pulsar.functions.runtime.shaded.io.netty.buffer.ByteBuf;
import org.apache.pulsar.functions.runtime.shaded.net.jodah.typetools.TypeResolver;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.MultiConsumerPulsarSource;
import org.apache.pulsar.functions.source.MultiConsumerPulsarSourceConfig;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
import org.apache.pulsar.functions.source.SingleConsumerPulsarSource;
import org.apache.pulsar.functions.source.SingleConsumerPulsarSourceConfig;
import org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.windowing.WindowFunctionExecutor;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JavaInstanceRunnable
implements AutoCloseable,
Runnable {
    private static final Logger log = LoggerFactory.getLogger(JavaInstanceRunnable.class);
    private final InstanceConfig instanceConfig;
    private final ClientBuilder clientBuilder;
    private final PulsarClientImpl client;
    private final PulsarAdmin pulsarAdmin;
    private LogAppender logAppender;
    private final String stateStorageImplClass;
    private final String stateStorageServiceUrl;
    private StateStoreProvider stateStoreProvider;
    private StateManager stateManager;
    private JavaInstance javaInstance;
    private Throwable deathException;
    private ComponentStatsManager stats;
    private Record<?> currentRecord;
    private Source source;
    private Sink sink;
    private final SecretsProvider secretsProvider;
    private FunctionCollectorRegistry collectorRegistry;
    private final String[] metricsLabels;
    private InstanceCache instanceCache;
    private final Function.FunctionDetails.ComponentType componentType;
    private final Map<String, String> properties;
    private final ClassLoader instanceClassLoader;
    private final ClassLoader componentClassLoader;
    private final ClassLoader functionClassLoader;
    private transient boolean isInitialized = false;
    private final ReadWriteLock statsLock = new ReentrantReadWriteLock();
    private Class<?> sinkTypeArg;
    private final AtomicReference<Schema<?>> sinkSchema = new AtomicReference();
    private SinkSchemaInfoProvider sinkSchemaInfoProvider = null;
    private final ProducerCache producerCache = new ProducerCache();

    public JavaInstanceRunnable(InstanceConfig instanceConfig, ClientBuilder clientBuilder, PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, String stateStorageImplClass, String stateStorageServiceUrl, SecretsProvider secretsProvider, FunctionCollectorRegistry collectorRegistry, ClassLoader componentClassLoader, ClassLoader transformFunctionClassLoader) throws PulsarClientException {
        this.instanceConfig = instanceConfig;
        this.clientBuilder = clientBuilder;
        this.client = (PulsarClientImpl)pulsarClient;
        this.pulsarAdmin = pulsarAdmin;
        this.stateStorageImplClass = stateStorageImplClass;
        this.stateStorageServiceUrl = stateStorageServiceUrl;
        this.secretsProvider = secretsProvider;
        this.componentClassLoader = componentClassLoader;
        this.functionClassLoader = transformFunctionClassLoader != null ? transformFunctionClassLoader : componentClassLoader;
        this.metricsLabels = new String[]{instanceConfig.getFunctionDetails().getTenant(), String.format("%s/%s", instanceConfig.getFunctionDetails().getTenant(), instanceConfig.getFunctionDetails().getNamespace()), instanceConfig.getFunctionDetails().getName(), String.valueOf(instanceConfig.getInstanceId()), instanceConfig.getClusterName(), FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails())};
        this.componentType = InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails());
        this.properties = InstanceUtils.getProperties(this.componentType, FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()), this.instanceConfig.getInstanceId());
        this.collectorRegistry = collectorRegistry;
        this.instanceClassLoader = Thread.currentThread().getContextClassLoader();
    }

    private synchronized void setup() throws Exception {
        this.instanceCache = InstanceCache.getInstanceCache();
        if (this.collectorRegistry == null) {
            this.collectorRegistry = FunctionCollectorRegistry.getDefaultImplementation();
        }
        this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels, this.instanceCache.getScheduledExecutorService(), this.componentType);
        ThreadContext.put((String)"function", (String)FunctionCommon.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
        ThreadContext.put((String)"functionname", (String)this.instanceConfig.getFunctionDetails().getName());
        ThreadContext.put((String)"instance", (String)this.instanceConfig.getInstanceName());
        log.info("Starting Java Instance {} : \n Details = {}", (Object)this.instanceConfig.getFunctionDetails().getName(), (Object)this.instanceConfig.getFunctionDetails());
        Object object = this.instanceConfig.getFunctionDetails().getClassName().equals(WindowFunctionExecutor.class.getName()) ? Reflections.createInstance(this.instanceConfig.getFunctionDetails().getClassName(), this.instanceClassLoader) : Reflections.createInstance(this.instanceConfig.getFunctionDetails().getClassName(), this.functionClassLoader);
        if (!(object instanceof Function) && !(object instanceof java.util.function.Function)) {
            throw new RuntimeException("User class must either be Function or java.util.Function");
        }
        this.setupStateStore();
        ContextImpl contextImpl = this.setupContext();
        this.setupOutput(contextImpl);
        this.setupInput(contextImpl);
        this.setupLogHandler();
        if (!(object instanceof IdentityFunction) && !(this.sink instanceof PulsarSink)) {
            this.sinkSchemaInfoProvider = new SinkSchemaInfoProvider();
        }
        this.javaInstance = new JavaInstance(contextImpl, object, this.instanceConfig);
        try {
            Thread.currentThread().setContextClassLoader(this.functionClassLoader);
            this.javaInstance.initialize();
        }
        finally {
            Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
        }
        this.isInitialized = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    ContextImpl setupContext() throws PulsarClientException {
        Logger instanceLog = LoggerFactory.getILoggerFactory().getLogger("function-" + this.instanceConfig.getFunctionDetails().getName());
        ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(this.functionClassLoader);
            ContextImpl contextImpl = new ContextImpl(this.instanceConfig, instanceLog, this.client, this.secretsProvider, this.collectorRegistry, this.metricsLabels, this.componentType, this.stats, this.stateManager, this.pulsarAdmin, this.clientBuilder, this.producerCache);
            return contextImpl;
        }
        finally {
            Thread.currentThread().setContextClassLoader(clsLoader);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            try {
                this.setup();
                Thread currentThread = Thread.currentThread();
                Consumer<Throwable> asyncErrorHandler = throwable -> currentThread.interrupt();
                AsyncResultConsumer asyncResultConsumer = this::handleResult;
                while (true) {
                    this.currentRecord = this.readInput();
                    this.stats.incrTotalReceived();
                    if (this.instanceConfig.getFunctionDetails().getProcessingGuarantees() == Function.ProcessingGuarantees.ATMOST_ONCE && this.instanceConfig.getFunctionDetails().getAutoAck()) {
                        this.currentRecord.ack();
                    }
                    this.stats.setLastInvocation(System.currentTimeMillis());
                    this.stats.processTimeStart();
                    Thread.currentThread().setContextClassLoader(this.functionClassLoader);
                    JavaExecutionResult result = this.javaInstance.handleMessage(this.currentRecord, this.currentRecord.getValue(), asyncResultConsumer, asyncErrorHandler);
                    Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
                    this.stats.processTimeEnd();
                    if (result == null) continue;
                    this.handleResult(this.currentRecord, result);
                }
            }
            catch (Throwable t) {
                log.error("[{}] Uncaught exception in Java Instance", (Object)FunctionCommon.getFullyQualifiedInstanceId(this.instanceConfig.getFunctionDetails().getTenant(), this.instanceConfig.getFunctionDetails().getNamespace(), this.instanceConfig.getFunctionDetails().getName(), this.instanceConfig.getInstanceId()), (Object)t);
                this.deathException = t;
                if (this.stats != null) {
                    this.stats.incrSysExceptions(t);
                }
                log.info("Closing instance");
                this.close();
            }
        }
        catch (Throwable throwable2) {
            log.info("Closing instance");
            this.close();
            throw throwable2;
        }
    }

    private void setupStateStore() throws Exception {
        this.stateManager = new InstanceStateManager();
        if (null == this.stateStorageServiceUrl) {
            this.stateStoreProvider = StateStoreProvider.NULL;
        } else {
            this.stateStoreProvider = this.getStateStoreProvider();
            HashMap<String, Object> stateStoreProviderConfig = new HashMap<String, Object>();
            stateStoreProviderConfig.put("stateStorageServiceUrl", this.stateStorageServiceUrl);
            this.stateStoreProvider.init(stateStoreProviderConfig, this.instanceConfig.getFunctionDetails());
            Object store = this.stateStoreProvider.getStateStore(this.instanceConfig.getFunctionDetails().getTenant(), this.instanceConfig.getFunctionDetails().getNamespace(), this.instanceConfig.getFunctionDetails().getName());
            StateStoreContextImpl context = new StateStoreContextImpl();
            store.init(context);
            this.stateManager.registerStore((StateStore)store);
        }
    }

    private StateStoreProvider getStateStoreProvider() throws Exception {
        if (this.stateStorageImplClass == null) {
            return new BKStateStoreProviderImpl();
        }
        return (StateStoreProvider)Class.forName(this.stateStorageImplClass).getConstructor(new Class[0]).newInstance(new Object[0]);
    }

    @VisibleForTesting
    void handleResult(Record srcRecord, JavaExecutionResult result) throws Exception {
        if (result.getUserException() != null) {
            Exception t = result.getUserException();
            log.warn("Encountered exception when processing message {}", (Object)srcRecord, (Object)t);
            this.stats.incrUserExceptions(t);
            srcRecord.fail();
        } else {
            if (result.getResult() != null) {
                this.sendOutputMessage(srcRecord, result.getResult());
            } else {
                Function.FunctionDetails functionDetails = this.instanceConfig.getFunctionDetails();
                if (!(functionDetails.getProcessingGuarantees() == Function.ProcessingGuarantees.MANUAL || functionDetails.getAutoAck() && functionDetails.getProcessingGuarantees() == Function.ProcessingGuarantees.ATMOST_ONCE)) {
                    srcRecord.ack();
                }
            }
            this.stats.incrTotalProcessedSuccessfully();
        }
    }

    private void sendOutputMessage(Record srcRecord, Object output) throws Exception {
        AbstractSinkRecord sinkRecord;
        if (this.componentType == Function.FunctionDetails.ComponentType.SINK) {
            Thread.currentThread().setContextClassLoader(this.componentClassLoader);
        }
        if (output instanceof Record) {
            Record record = (Record)output;
            sinkRecord = this.sinkSchemaInfoProvider != null ? this.encodeWithRecordSchemaAndDecodeWithSinkSchema(srcRecord, record) : new OutputRecordSinkRecord(srcRecord, record);
        } else {
            sinkRecord = new SinkRecord<Object>(srcRecord, output);
        }
        try {
            this.sink.write(sinkRecord);
        }
        catch (Exception e) {
            log.info("Encountered exception in sink write: ", (Throwable)e);
            this.stats.incrSinkExceptions(e);
            srcRecord.fail();
            throw e;
        }
        finally {
            Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
        }
    }

    private OutputRecordSinkRecord encodeWithRecordSchemaAndDecodeWithSinkSchema(Record srcRecord, Record record) {
        Schema<Object> finalSchema;
        Schema<?> schema;
        KeyValueSchema kvSchema;
        Schema<Object> encodingSchema = record.getSchema();
        boolean isKeyValueSeparated = false;
        if (encodingSchema instanceof KeyValueSchema && (kvSchema = (KeyValueSchema)encodingSchema).getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
            encodingSchema = KeyValueSchemaImpl.of(kvSchema.getKeySchema(), kvSchema.getValueSchema());
            isKeyValueSeparated = true;
        }
        byte[] encoded = encodingSchema.encode(record.getValue());
        if (this.sinkSchema.get() == null) {
            schema = JavaInstanceRunnable.getSinkSchema(record, this.sinkTypeArg);
            schema.setSchemaInfoProvider(this.sinkSchemaInfoProvider);
            this.sinkSchema.compareAndSet(null, schema);
        }
        schema = this.sinkSchema.get();
        SchemaVersion schemaVersion = this.sinkSchemaInfoProvider.addSchemaIfNeeded(encodingSchema);
        byte[] schemaVersionBytes = schemaVersion.bytes();
        Object decoded = schema.decode(encoded, schemaVersionBytes);
        if (schema instanceof AutoConsumeSchema) {
            schema = ((AutoConsumeSchema)schema).getInternalSchema(schemaVersionBytes);
        }
        if (isKeyValueSeparated && schema instanceof KeyValueSchema) {
            KeyValueSchema kvSchema2 = (KeyValueSchema)schema;
            finalSchema = KeyValueSchemaImpl.of(kvSchema2.getKeySchema(), kvSchema2.getValueSchema(), KeyValueEncodingType.SEPARATED);
        } else {
            finalSchema = schema;
        }
        return new OutputRecordSinkRecord(srcRecord, record, decoded, finalSchema);
    }

    private Record readInput() throws Exception {
        Record record;
        if (this.componentType == Function.FunctionDetails.ComponentType.SOURCE) {
            Thread.currentThread().setContextClassLoader(this.componentClassLoader);
        }
        try {
            record = this.source.read();
        }
        catch (Exception e) {
            if (this.stats != null) {
                this.stats.incrSourceExceptions(e);
            }
            log.error("Encountered exception in source read", (Throwable)e);
            throw e;
        }
        finally {
            Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
        }
        if (record == null) {
            throw new IllegalArgumentException("The record returned by the source cannot be null");
        }
        if (record.getValue() == null) {
            throw new IllegalArgumentException("The value in the record returned by the source cannot be null");
        }
        return record;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void close() {
        this.isInitialized = false;
        if (this.stats != null) {
            this.stats.close();
            this.stats = null;
        }
        if (this.source != null) {
            if (this.componentType == Function.FunctionDetails.ComponentType.SOURCE) {
                Thread.currentThread().setContextClassLoader(this.componentClassLoader);
            }
            try {
                this.source.close();
            }
            catch (Throwable e) {
                log.error("Failed to close source {}", (Object)this.instanceConfig.getFunctionDetails().getSource().getClassName(), (Object)e);
            }
            finally {
                Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
            }
            this.source = null;
        }
        if (this.sink != null) {
            if (this.componentType == Function.FunctionDetails.ComponentType.SINK) {
                Thread.currentThread().setContextClassLoader(this.componentClassLoader);
            }
            try {
                this.sink.close();
            }
            catch (Throwable e) {
                log.error("Failed to close sink {}", (Object)this.instanceConfig.getFunctionDetails().getSource().getClassName(), (Object)e);
            }
            finally {
                Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
            }
            this.sink = null;
        }
        if (null != this.javaInstance) {
            try {
                Thread.currentThread().setContextClassLoader(this.functionClassLoader);
                this.javaInstance.close();
            }
            finally {
                Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
                this.javaInstance = null;
            }
        }
        if (null != this.stateManager) {
            this.stateManager.close();
        }
        if (null != this.stateStoreProvider) {
            this.stateStoreProvider.close();
        }
        this.instanceCache = null;
        this.producerCache.close();
        if (this.logAppender != null) {
            this.removeLogTopicAppender(LoggerContext.getContext());
            this.removeLogTopicAppender(LoggerContext.getContext((boolean)false));
            this.logAppender.stop();
            this.logAppender = null;
        }
    }

    public String getStatsAsString() throws IOException {
        if (this.isInitialized) {
            this.statsLock.readLock().lock();
            try {
                String string = this.stats.getStatsAsString();
                return string;
            }
            finally {
                this.statsLock.readLock().unlock();
            }
        }
        return "";
    }

    public InstanceCommunication.MetricsData getAndResetMetrics() {
        if (this.isInitialized) {
            this.statsLock.writeLock().lock();
            try {
                InstanceCommunication.MetricsData metricsData = this.internalGetMetrics();
                this.internalResetMetrics();
                InstanceCommunication.MetricsData metricsData2 = metricsData;
                return metricsData2;
            }
            finally {
                this.statsLock.writeLock().unlock();
            }
        }
        return InstanceCommunication.MetricsData.getDefaultInstance();
    }

    public InstanceCommunication.MetricsData getMetrics() {
        if (this.isInitialized) {
            this.statsLock.readLock().lock();
            try {
                InstanceCommunication.MetricsData metricsData = this.internalGetMetrics();
                return metricsData;
            }
            finally {
                this.statsLock.readLock().unlock();
            }
        }
        return InstanceCommunication.MetricsData.getDefaultInstance();
    }

    public void resetMetrics() {
        if (this.isInitialized) {
            this.statsLock.writeLock().lock();
            try {
                this.internalResetMetrics();
            }
            finally {
                this.statsLock.writeLock().unlock();
            }
        }
    }

    private InstanceCommunication.MetricsData internalGetMetrics() {
        InstanceCommunication.MetricsData.Builder bldr = this.createMetricsDataBuilder();
        Map<String, Double> userMetrics = this.javaInstance.getMetrics();
        if (userMetrics != null) {
            bldr.putAllUserMetrics(userMetrics);
        }
        return bldr.build();
    }

    private void internalResetMetrics() {
        this.stats.reset();
        this.javaInstance.resetMetrics();
    }

    private InstanceCommunication.MetricsData.Builder createMetricsDataBuilder() {
        InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder();
        if (this.stats != null) {
            bldr.setProcessedSuccessfullyTotal((long)this.stats.getTotalProcessedSuccessfully());
            bldr.setSystemExceptionsTotal((long)this.stats.getTotalSysExceptions());
            bldr.setUserExceptionsTotal((long)this.stats.getTotalUserExceptions());
            bldr.setReceivedTotal((long)this.stats.getTotalRecordsReceived());
            bldr.setAvgProcessLatency(this.stats.getAvgProcessLatency());
            bldr.setLastInvocation((long)this.stats.getLastInvocation());
            bldr.setProcessedSuccessfullyTotal1Min((long)this.stats.getTotalProcessedSuccessfully1min());
            bldr.setSystemExceptionsTotal1Min((long)this.stats.getTotalSysExceptions1min());
            bldr.setUserExceptionsTotal1Min((long)this.stats.getTotalUserExceptions1min());
            bldr.setReceivedTotal1Min((long)this.stats.getTotalRecordsReceived1min());
            bldr.setAvgProcessLatency1Min(this.stats.getAvgProcessLatency1min());
        }
        return bldr;
    }

    public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
        InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
        if (this.isInitialized) {
            this.statsLock.readLock().lock();
            try {
                functionStatusBuilder.setNumReceived((long)this.stats.getTotalRecordsReceived());
                functionStatusBuilder.setNumSuccessfullyProcessed((long)this.stats.getTotalProcessedSuccessfully());
                functionStatusBuilder.setNumUserExceptions((long)this.stats.getTotalUserExceptions());
                this.stats.getLatestUserExceptions().forEach(ex -> functionStatusBuilder.addLatestUserExceptions((InstanceCommunication.FunctionStatus.ExceptionInformation)ex));
                functionStatusBuilder.setNumSystemExceptions((long)this.stats.getTotalSysExceptions());
                this.stats.getLatestSystemExceptions().forEach(ex -> functionStatusBuilder.addLatestSystemExceptions((InstanceCommunication.FunctionStatus.ExceptionInformation)ex));
                this.stats.getLatestSourceExceptions().forEach(ex -> functionStatusBuilder.addLatestSourceExceptions((InstanceCommunication.FunctionStatus.ExceptionInformation)ex));
                this.stats.getLatestSinkExceptions().forEach(ex -> functionStatusBuilder.addLatestSinkExceptions((InstanceCommunication.FunctionStatus.ExceptionInformation)ex));
                functionStatusBuilder.setAverageLatency(this.stats.getAvgProcessLatency());
                functionStatusBuilder.setLastInvocationTime((long)this.stats.getLastInvocation());
            }
            finally {
                this.statsLock.readLock().unlock();
            }
        }
        return functionStatusBuilder;
    }

    private void setupLogHandler() {
        if (this.instanceConfig.getFunctionDetails().getLogTopic() != null && !this.instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) {
            new Crc32cIntChecksum();
            this.logAppender = new LogAppender(this.client, this.instanceConfig.getFunctionDetails().getLogTopic(), FunctionCommon.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()), this.instanceConfig.getInstanceName());
            this.logAppender.start();
            this.setupLogTopicAppender(LoggerContext.getContext());
            this.setupLogTopicAppender(LoggerContext.getContext((boolean)false));
        }
    }

    private void setupLogTopicAppender(LoggerContext context) {
        Configuration config = context.getConfiguration();
        config.addAppender((Appender)this.logAppender);
        for (LoggerConfig loggerConfig : config.getLoggers().values()) {
            loggerConfig.addAppender((Appender)this.logAppender, null, null);
        }
        config.getRootLogger().addAppender((Appender)this.logAppender, null, null);
        context.updateLoggers();
    }

    private void removeLogTopicAppender(LoggerContext context) {
        Configuration config = context.getConfiguration();
        for (LoggerConfig loggerConfig : config.getLoggers().values()) {
            loggerConfig.removeAppender(this.logAppender.getName());
        }
        config.getRootLogger().removeAppender(this.logAppender.getName());
        context.updateLoggers();
    }

    private void setupInput(ContextImpl contextImpl) throws Exception {
        Object object;
        Function.SourceSpec sourceSpec = this.instanceConfig.getFunctionDetails().getSource();
        if (sourceSpec.getClassName().isEmpty()) {
            PulsarSourceConfig pulsarSourceConfig;
            TreeMap<String, ConsumerConfig> topicSchema = new TreeMap<String, ConsumerConfig>();
            sourceSpec.getInputSpecsMap().forEach((topic, conf) -> {
                ConsumerConfig consumerConfig = ConsumerConfig.builder().isRegexPattern(conf.getIsRegexPattern()).build();
                if (conf.getSchemaType() != null && !conf.getSchemaType().isEmpty()) {
                    consumerConfig.setSchemaType(conf.getSchemaType());
                } else if (conf.getSerdeClassName() != null && !conf.getSerdeClassName().isEmpty()) {
                    consumerConfig.setSerdeClassName(conf.getSerdeClassName());
                }
                consumerConfig.setSchemaProperties(conf.getSchemaPropertiesMap());
                consumerConfig.setConsumerProperties(conf.getConsumerPropertiesMap());
                if (conf.hasReceiverQueueSize()) {
                    consumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize().getValue());
                }
                if (conf.hasCryptoSpec()) {
                    consumerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec()));
                }
                consumerConfig.setPoolMessages(conf.getPoolMessages());
                topicSchema.put((String)topic, consumerConfig);
            });
            sourceSpec.getTopicsToSerDeClassNameMap().forEach((topic, serde) -> topicSchema.put((String)topic, ConsumerConfig.builder().serdeClassName((String)serde).isRegexPattern(false).build()));
            if (!StringUtils.isEmpty(sourceSpec.getTopicsPattern())) {
                ((ConsumerConfig)topicSchema.get(sourceSpec.getTopicsPattern())).setRegexPattern(true);
            }
            if (topicSchema.size() == 1) {
                SingleConsumerPulsarSourceConfig singleConsumerPulsarSourceConfig = new SingleConsumerPulsarSourceConfig();
                Map.Entry entry = topicSchema.entrySet().iterator().next();
                singleConsumerPulsarSourceConfig.setTopic((String)entry.getKey());
                singleConsumerPulsarSourceConfig.setConsumerConfig((ConsumerConfig)entry.getValue());
                pulsarSourceConfig = singleConsumerPulsarSourceConfig;
            } else {
                MultiConsumerPulsarSourceConfig multiConsumerPulsarSourceConfig = new MultiConsumerPulsarSourceConfig();
                multiConsumerPulsarSourceConfig.setTopicSchema(topicSchema);
                pulsarSourceConfig = multiConsumerPulsarSourceConfig;
            }
            pulsarSourceConfig.setSubscriptionName(StringUtils.isNotBlank(sourceSpec.getSubscriptionName()) ? sourceSpec.getSubscriptionName() : InstanceUtils.getDefaultSubscriptionName(this.instanceConfig.getFunctionDetails()));
            pulsarSourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf(this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
            pulsarSourceConfig.setSubscriptionPosition(FunctionCommon.convertFromFunctionDetailsSubscriptionPosition(sourceSpec.getSubscriptionPosition()));
            pulsarSourceConfig.setSkipToLatest(sourceSpec.getSkipToLatest());
            Objects.requireNonNull(contextImpl.getSubscriptionType());
            pulsarSourceConfig.setSubscriptionType(contextImpl.getSubscriptionType());
            pulsarSourceConfig.setTypeClassName(sourceSpec.getTypeClassName());
            if (sourceSpec.getTimeoutMs() > 0L) {
                pulsarSourceConfig.setTimeoutMs(sourceSpec.getTimeoutMs());
            }
            if (sourceSpec.getNegativeAckRedeliveryDelayMs() > 0L) {
                pulsarSourceConfig.setNegativeAckRedeliveryDelayMs(sourceSpec.getNegativeAckRedeliveryDelayMs());
            }
            if (this.instanceConfig.getFunctionDetails().hasRetryDetails()) {
                pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
                pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
            }
            object = pulsarSourceConfig instanceof SingleConsumerPulsarSourceConfig ? new SingleConsumerPulsarSource((PulsarClient)this.client, (SingleConsumerPulsarSourceConfig)pulsarSourceConfig, this.properties, this.functionClassLoader) : new MultiConsumerPulsarSource((PulsarClient)this.client, (MultiConsumerPulsarSourceConfig)pulsarSourceConfig, this.properties, this.functionClassLoader);
        } else {
            object = sourceSpec.getClassName().equals(BatchSourceExecutor.class.getName()) ? Reflections.createInstance(sourceSpec.getClassName(), this.instanceClassLoader) : Reflections.createInstance(sourceSpec.getClassName(), this.componentClassLoader);
        }
        if (object instanceof Source) {
            Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Source.class, object.getClass());
            assert (typeArgs.length > 0);
        } else {
            throw new RuntimeException("Source does not implement correct interface");
        }
        this.source = (Source)object;
        if (this.componentType == Function.FunctionDetails.ComponentType.SOURCE) {
            Thread.currentThread().setContextClassLoader(this.componentClassLoader);
        }
        try {
            if (sourceSpec.getConfigs().isEmpty()) {
                this.source.open(new HashMap<String, Object>(), contextImpl);
            } else {
                this.source.open((Map)ObjectMapperFactory.getMapper().reader().forType(new TypeReference<Map<String, Object>>(){}).readValue(sourceSpec.getConfigs()), contextImpl);
            }
            if (this.source instanceof PulsarSource) {
                contextImpl.setInputConsumers(((PulsarSource)this.source).getInputConsumers());
            }
        }
        catch (Exception e) {
            log.error("Source open produced uncaught exception: ", (Throwable)e);
            throw e;
        }
        finally {
            Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
        }
    }

    private void setupOutput(ContextImpl contextImpl) throws Exception {
        PulsarSink object;
        Function.SinkSpec sinkSpec = this.instanceConfig.getFunctionDetails().getSink();
        if (sinkSpec.getClassName().isEmpty()) {
            if (StringUtils.isEmpty(sinkSpec.getTopic())) {
                object = PulsarSinkDisable.INSTANCE;
            } else {
                PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig();
                pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf(this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
                pulsarSinkConfig.setTopic(sinkSpec.getTopic());
                pulsarSinkConfig.setForwardSourceMessageProperty(this.instanceConfig.getFunctionDetails().getSink().getForwardSourceMessageProperty());
                if (!StringUtils.isEmpty(sinkSpec.getSchemaType())) {
                    pulsarSinkConfig.setSchemaType(sinkSpec.getSchemaType());
                } else if (!StringUtils.isEmpty(sinkSpec.getSerDeClassName())) {
                    pulsarSinkConfig.setSerdeClassName(sinkSpec.getSerDeClassName());
                }
                pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
                pulsarSinkConfig.setSchemaProperties(sinkSpec.getSchemaPropertiesMap());
                if (this.instanceConfig.getFunctionDetails().getSink().getProducerSpec() != null) {
                    Function.ProducerSpec conf = this.instanceConfig.getFunctionDetails().getSink().getProducerSpec();
                    ProducerConfig.ProducerConfigBuilder builder = ProducerConfig.builder().maxPendingMessages(conf.getMaxPendingMessages()).maxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions()).batchBuilder(conf.getBatchBuilder()).useThreadLocalProducers(conf.getUseThreadLocalProducers()).cryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec())).compressionType(FunctionCommon.convertFromFunctionDetailsCompressionType(conf.getCompressionType()));
                    pulsarSinkConfig.setProducerConfig(builder.build());
                }
                object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats, this.functionClassLoader, this.producerCache);
            }
        } else {
            object = Reflections.createInstance(sinkSpec.getClassName(), this.componentClassLoader);
        }
        if (!(object instanceof Sink)) {
            throw new RuntimeException("Sink does not implement correct interface");
        }
        this.sink = object;
        this.sinkTypeArg = TypeResolver.resolveRawArguments(Sink.class, object.getClass())[0];
        if (this.componentType == Function.FunctionDetails.ComponentType.SINK) {
            Thread.currentThread().setContextClassLoader(this.componentClassLoader);
        }
        try {
            if (sinkSpec.getConfigs().isEmpty()) {
                if (log.isDebugEnabled()) {
                    log.debug("Opening Sink with empty hashmap with contextImpl: {} ", (Object)contextImpl.toString());
                }
                this.sink.open(new HashMap<String, Object>(), contextImpl);
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("Opening Sink with SinkSpec {} and contextImpl: {} ", (Object)sinkSpec, (Object)contextImpl.toString());
                }
                this.sink.open((Map)ObjectMapperFactory.getMapper().reader().forType(new TypeReference<Map<String, Object>>(){}).readValue(sinkSpec.getConfigs()), contextImpl);
            }
        }
        catch (Exception e) {
            log.error("Sink open produced uncaught exception: ", (Throwable)e);
            throw e;
        }
        finally {
            Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
        }
    }

    private static <T> Schema<T> getSinkSchema(Record<?> record, Class<T> clazz) {
        SchemaType type = JavaInstanceRunnable.getSchemaTypeOrDefault(record, clazz);
        switch (type) {
            case NONE: {
                if (ByteBuffer.class.isAssignableFrom(clazz)) {
                    return Schema.BYTEBUFFER;
                }
                return Schema.BYTES;
            }
            case AUTO_CONSUME: 
            case AUTO: {
                return Schema.AUTO_CONSUME();
            }
            case STRING: {
                return Schema.STRING;
            }
            case AVRO: {
                return AvroSchema.of(SchemaDefinition.builder().withPojo(clazz).build());
            }
            case JSON: {
                return JSONSchema.of(SchemaDefinition.builder().withPojo(clazz).build());
            }
            case KEY_VALUE: {
                return Schema.KV_BYTES();
            }
            case PROTOBUF: {
                return ProtobufSchema.ofGenericClass(clazz, new HashMap<String, String>());
            }
            case PROTOBUF_NATIVE: {
                return ProtobufNativeSchema.ofGenericClass(clazz, new HashMap<String, String>());
            }
            case AUTO_PUBLISH: {
                return Schema.AUTO_PRODUCE_BYTES();
            }
        }
        throw new RuntimeException("Unsupported schema type" + type);
    }

    private static SchemaType getSchemaTypeOrDefault(Record<?> record, Class<?> clazz) {
        if (GenericObject.class.isAssignableFrom(clazz)) {
            return SchemaType.AUTO_CONSUME;
        }
        if (byte[].class.equals(clazz) || ByteBuf.class.equals(clazz) || ByteBuffer.class.equals(clazz)) {
            return SchemaType.NONE;
        }
        Schema<?> schema = record.getSchema();
        if (schema != null) {
            if (schema.getSchemaInfo().getType() == SchemaType.NONE) {
                return JavaInstanceRunnable.getDefaultSchemaType(clazz);
            }
            return schema.getSchemaInfo().getType();
        }
        return JavaInstanceRunnable.getDefaultSchemaType(clazz);
    }

    private static SchemaType getDefaultSchemaType(Class<?> clazz) {
        if (byte[].class.equals(clazz) || ByteBuf.class.equals(clazz) || ByteBuffer.class.equals(clazz)) {
            return SchemaType.NONE;
        }
        if (GenericObject.class.isAssignableFrom(clazz)) {
            return SchemaType.AUTO_CONSUME;
        }
        if (String.class.equals(clazz)) {
            return SchemaType.STRING;
        }
        if (JavaInstanceRunnable.isProtobufClass(clazz)) {
            return SchemaType.PROTOBUF;
        }
        if (KeyValue.class.equals(clazz)) {
            return SchemaType.KEY_VALUE;
        }
        return SchemaType.JSON;
    }

    private static boolean isProtobufClass(Class<?> pojoClazz) {
        try {
            Class<?> protobufBaseClass = Class.forName("org.apache.pulsar.functions.runtime.shaded.com.google.protobuf.GeneratedMessageV3");
            return protobufBaseClass.isAssignableFrom(pojoClazz);
        }
        catch (ClassNotFoundException | NoClassDefFoundError e) {
            return false;
        }
    }

    public Throwable getDeathException() {
        return this.deathException;
    }

    public static interface AsyncResultConsumer {
        public void accept(Record var1, JavaExecutionResult var2) throws Exception;
    }
}

