package org.apache.nifi.processors.hive;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.regex.Pattern;
import javax.security.auth.login.LoginException;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.ConnectionError;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.SerializationError;
import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.DiscontinuedException;
import org.apache.nifi.processor.util.pattern.ErrorTypes;
import org.apache.nifi.processor.util.pattern.ExceptionHandler;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processor.util.pattern.RoutingResult;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.util.hive.AuthenticationFailedException;
import org.apache.nifi.util.hive.HiveConfigurator;
import org.apache.nifi.util.hive.HiveOptions;
import org.apache.nifi.util.hive.HiveUtils;
import org.apache.nifi.util.hive.HiveWriter;
import org.apache.nifi.util.hive.ValidationResources;
import org.xerial.snappy.Snappy;

@CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor. NOTE: If multiple concurrent tasks are configured for this processor, only one table can be written to at any time by a single thread. Additional tasks intending to write to the same table will wait for the current task to finish writing to the table.")
@RequiresInstanceClassLoading
@Tags({"hive", "streaming", "put", "database", "store"})
@WritesAttributes({@WritesAttribute(attribute = PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR, description = "This attribute is written on the flow files routed to the 'success' and 'failure' relationships, and contains the number of records from the incoming flow file written successfully and unsuccessfully, respectively."), @WritesAttribute(attribute = "query.output.tables", description = "This attribute is written on the flow files routed to the 'success' and 'failure' relationships, and contains the target table name in 'databaseName.tableName' format.")})
/* loaded from: input_file:org/apache/nifi/processors/hive/PutHiveStreaming.class */
public class PutHiveStreaming extends AbstractSessionFactoryProcessor {
    private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    private static final String CLIENT_CACHE_DISABLED_PROPERTY = "hcatalog.hive.client.cache.disabled";
    private static final Validator GREATER_THAN_ONE_VALIDATOR = (str, str2, validationContext) -> {
        if (validationContext.isExpressionLanguageSupported(str) && validationContext.isExpressionLanguagePresent(str2)) {
            return new ValidationResult.Builder().subject(str).input(str2).explanation("Expression Language Present").valid(true).build();
        }
        String str = null;
        try {
            if (Integer.parseInt(str2) < 2) {
                str = "value is less than 2";
            }
        } catch (NumberFormatException e) {
            str = "value is not a valid integer";
        }
        return new ValidationResult.Builder().subject(str).input(str2).explanation(str).valid(str == null).build();
    };
    private static final Set<String> RESERVED_METADATA;
    public static final PropertyDescriptor METASTORE_URI;
    public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES;
    public static final PropertyDescriptor DB_NAME;
    public static final PropertyDescriptor TABLE_NAME;
    public static final PropertyDescriptor PARTITION_COLUMNS;
    public static final PropertyDescriptor AUTOCREATE_PARTITIONS;
    public static final PropertyDescriptor MAX_OPEN_CONNECTIONS;
    public static final PropertyDescriptor HEARTBEAT_INTERVAL;
    public static final PropertyDescriptor TXNS_PER_BATCH;
    public static final PropertyDescriptor RECORDS_PER_TXN;
    public static final PropertyDescriptor CALL_TIMEOUT;
    public static final PropertyDescriptor ROLLBACK_ON_FAILURE;
    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE;
    public static final Relationship REL_SUCCESS;
    public static final Relationship REL_FAILURE;
    public static final Relationship REL_RETRY;
    private List<PropertyDescriptor> propertyDescriptors;
    private Set<Relationship> relationships;
    protected KerberosProperties kerberosProperties;
    protected volatile UserGroupInformation ugi;
    protected volatile HiveConf hiveConfig;
    protected volatile int callTimeout;
    protected ExecutorService callTimeoutPool;
    protected transient Timer heartBeatTimer;
    private volatile File kerberosConfigFile = null;
    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    protected final AtomicReference<KerberosUser> kerberosUserReference = new AtomicReference<>();
    protected final AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
    protected volatile ConcurrentLinkedQueue<Map<HiveEndPoint, HiveWriter>> threadWriterList = new ConcurrentLinkedQueue<>();
    protected volatile ConcurrentHashMap<String, Semaphore> tableSemaphoreMap = new ConcurrentHashMap<>();
    private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.nifi.processors.hive.PutHiveStreaming$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/nifi/processors/hive/PutHiveStreaming$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$nifi$processor$util$pattern$ErrorTypes$Destination = new int[ErrorTypes.Destination.values().length];

        static {
            try {
                $SwitchMap$org$apache$nifi$processor$util$pattern$ErrorTypes$Destination[ErrorTypes.Destination.Failure.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$nifi$processor$util$pattern$ErrorTypes$Destination[ErrorTypes.Destination.ProcessException.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$nifi$processor$util$pattern$ErrorTypes$Destination[ErrorTypes.Destination.Retry.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$nifi$processor$util$pattern$ErrorTypes$Destination[ErrorTypes.Destination.Self.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/hive/PutHiveStreaming$FunctionContext.class */
    public static class FunctionContext extends RollbackOnFailure {
        private AtomicReference<FlowFile> successFlowFile;
        private AtomicReference<FlowFile> failureFlowFile;
        private final DataFileWriter<GenericRecord> successAvroWriter;
        private final DataFileWriter<GenericRecord> failureAvroWriter;
        private byte[] successAvroHeader;
        private byte[] failureAvroHeader;
        private final AtomicInteger recordCount;
        private final AtomicInteger successfulRecordCount;
        private final AtomicInteger failedRecordCount;
        private final ComponentLog logger;

        private FunctionContext(boolean z, ComponentLog componentLog) {
            super(z, false);
            this.successAvroWriter = new DataFileWriter<>(new GenericDatumWriter());
            this.failureAvroWriter = new DataFileWriter<>(new GenericDatumWriter());
            this.recordCount = new AtomicInteger(0);
            this.successfulRecordCount = new AtomicInteger(0);
            this.failedRecordCount = new AtomicInteger(0);
            this.logger = componentLog;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setFlowFiles(FlowFile flowFile, FlowFile flowFile2) {
            this.successFlowFile = new AtomicReference<>(flowFile);
            this.failureFlowFile = new AtomicReference<>(flowFile2);
        }

        private byte[] initAvroWriter(ProcessSession processSession, String str, DataFileStream<GenericRecord> dataFileStream, DataFileWriter<GenericRecord> dataFileWriter, AtomicReference<FlowFile> atomicReference) {
            dataFileWriter.setCodec(CodecFactory.fromString(str));
            for (String str2 : dataFileStream.getMetaKeys()) {
                if (!PutHiveStreaming.RESERVED_METADATA.contains(str2)) {
                    dataFileWriter.setMeta(str2, dataFileStream.getMeta(str2));
                }
            }
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            atomicReference.set(processSession.append(atomicReference.get(), outputStream -> {
                dataFileWriter.create(dataFileStream.getSchema(), byteArrayOutputStream);
                dataFileWriter.close();
                outputStream.write(byteArrayOutputStream.toByteArray());
            }));
            return byteArrayOutputStream.toByteArray();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void initAvroWriters(ProcessSession processSession, String str, DataFileStream<GenericRecord> dataFileStream) {
            this.successAvroHeader = initAvroWriter(processSession, str, dataFileStream, this.successAvroWriter, this.successFlowFile);
            this.failureAvroHeader = initAvroWriter(processSession, str, dataFileStream, this.failureAvroWriter, this.failureFlowFile);
        }

        private void appendAvroRecords(ProcessSession processSession, byte[] bArr, DataFileWriter<GenericRecord> dataFileWriter, AtomicReference<FlowFile> atomicReference, List<HiveStreamingRecord> list) {
            atomicReference.set(processSession.append(atomicReference.get(), outputStream -> {
                if (list != null) {
                    dataFileWriter.appendTo(new SeekableByteArrayInput(bArr), outputStream);
                    try {
                        Iterator it = list.iterator();
                        while (it.hasNext()) {
                            dataFileWriter.append(((HiveStreamingRecord) it.next()).getRecord());
                        }
                    } catch (IOException e) {
                        this.logger.error("Error writing Avro records (which were sent successfully to Hive Streaming) to the flow file, " + e, e);
                    }
                }
                dataFileWriter.close();
            }));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void appendRecordsToSuccess(ProcessSession processSession, List<HiveStreamingRecord> list) {
            appendAvroRecords(processSession, this.successAvroHeader, this.successAvroWriter, this.successFlowFile, list);
            this.successfulRecordCount.addAndGet(list.size());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void appendRecordsToFailure(ProcessSession processSession, List<HiveStreamingRecord> list) {
            appendAvroRecords(processSession, this.failureAvroHeader, this.failureAvroWriter, this.failureFlowFile, list);
            this.failedRecordCount.addAndGet(list.size());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void transferFlowFiles(ProcessSession processSession, RoutingResult routingResult, HiveOptions hiveOptions) {
            if (this.successfulRecordCount.get() > 0) {
                HashMap hashMap = new HashMap();
                hashMap.put(PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(this.successfulRecordCount.get()));
                hashMap.put(AbstractHiveQLProcessor.ATTR_OUTPUT_TABLES, hiveOptions.getQualifiedTableName());
                this.successFlowFile.set(processSession.putAllAttributes(this.successFlowFile.get(), hashMap));
                processSession.getProvenanceReporter().send(this.successFlowFile.get(), hiveOptions.getMetaStoreURI());
                routingResult.routeTo(this.successFlowFile.get(), PutHiveStreaming.REL_SUCCESS);
            } else {
                processSession.remove(this.successFlowFile.get());
            }
            if (this.failedRecordCount.get() > 0) {
                HashMap hashMap2 = new HashMap();
                hashMap2.put(PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(this.failedRecordCount.get()));
                hashMap2.put(AbstractHiveQLProcessor.ATTR_OUTPUT_TABLES, hiveOptions.getQualifiedTableName());
                this.failureFlowFile.set(processSession.putAllAttributes(this.failureFlowFile.get(), hashMap2));
                routingResult.routeTo(this.failureFlowFile.get(), PutHiveStreaming.REL_FAILURE);
            } else {
                processSession.remove(this.failureFlowFile.get());
            }
            routingResult.getRoutedFlowFiles().forEach((relationship, list) -> {
                processSession.transfer(list, relationship);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/nifi/processors/hive/PutHiveStreaming$HiveStreamingRecord.class */
    public class HiveStreamingRecord {
        private List<String> partitionValues;
        private GenericRecord record;

        public HiveStreamingRecord(List<String> list, GenericRecord genericRecord) {
            this.partitionValues = list;
            this.record = genericRecord;
        }

        public List<String> getPartitionValues() {
            return this.partitionValues;
        }

        public GenericRecord getRecord() {
            return this.record;
        }
    }

    /* loaded from: input_file:org/apache/nifi/processors/hive/PutHiveStreaming$ShouldRetryException.class */
    private static class ShouldRetryException extends RuntimeException {
        private ShouldRetryException(String str, Throwable th) {
            super(str, th);
        }
    }

    protected void init(ProcessorInitializationContext processorInitializationContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(METASTORE_URI);
        arrayList.add(HIVE_CONFIGURATION_RESOURCES);
        arrayList.add(DB_NAME);
        arrayList.add(TABLE_NAME);
        arrayList.add(PARTITION_COLUMNS);
        arrayList.add(AUTOCREATE_PARTITIONS);
        arrayList.add(MAX_OPEN_CONNECTIONS);
        arrayList.add(HEARTBEAT_INTERVAL);
        arrayList.add(TXNS_PER_BATCH);
        arrayList.add(RECORDS_PER_TXN);
        arrayList.add(CALL_TIMEOUT);
        arrayList.add(ROLLBACK_ON_FAILURE);
        arrayList.add(KERBEROS_CREDENTIALS_SERVICE);
        this.kerberosConfigFile = processorInitializationContext.getKerberosConfigurationFile();
        this.kerberosProperties = new KerberosProperties(this.kerberosConfigFile);
        arrayList.add(this.kerberosProperties.getKerberosPrincipal());
        arrayList.add(this.kerberosProperties.getKerberosKeytab());
        arrayList.add(this.kerberosProperties.getKerberosPassword());
        this.propertyDescriptors = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        hashSet.add(REL_RETRY);
        this.relationships = Collections.unmodifiableSet(hashSet);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.propertyDescriptors;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        String principal;
        String keytab;
        boolean isSet = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet();
        ArrayList arrayList = new ArrayList();
        if (isSet) {
            String value = validationContext.getProperty(this.kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
            String value2 = validationContext.getProperty(this.kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
            String value3 = validationContext.getProperty(this.kerberosProperties.getKerberosPassword()).getValue();
            KerberosCredentialsService asControllerService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
            if (asControllerService == null) {
                principal = value;
                keytab = value2;
            } else {
                principal = asControllerService.getPrincipal();
                keytab = asControllerService.getKeytab();
            }
            arrayList.addAll(this.hiveConfigurator.validate(validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue(), principal, keytab, value3, this.validationResourceHolder, getLogger()));
            if (asControllerService != null && (value != null || value2 != null || value3 != null)) {
                arrayList.add(new ValidationResult.Builder().subject("Kerberos Credentials").valid(false).explanation("Cannot specify a Kerberos Credentials Service while also specifying a Kerberos Principal, Kerberos Keytab, or Kerberos Password").build());
            }
            if (!isAllowExplicitKeytab() && value2 != null) {
                arrayList.add(new ValidationResult.Builder().subject("Kerberos Credentials").valid(false).explanation("The 'NIFI_ALLOW_EXPLICIT_KEYTAB' system environment variable is configured to forbid explicitly configuring Kerberos Keytab in processors. The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.").build());
            }
        }
        return arrayList;
    }

    @OnScheduled
    public void setup(ProcessContext processContext) {
        String principal;
        String keytab;
        ComponentLog logger = getLogger();
        Integer asInteger = processContext.getProperty(HEARTBEAT_INTERVAL).evaluateAttributeExpressions().asInteger();
        this.hiveConfig = this.hiveConfigurator.getConfigurationFromFiles(processContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue());
        if (processContext.getMaxConcurrentTasks() > 1) {
            this.hiveConfig.setBoolean(CLIENT_CACHE_DISABLED_PROPERTY, true);
        }
        for (Map.Entry entry : processContext.getProperties().entrySet()) {
            PropertyDescriptor propertyDescriptor = (PropertyDescriptor) entry.getKey();
            if (propertyDescriptor.isDynamic()) {
                this.hiveConfig.set(propertyDescriptor.getName(), (String) entry.getValue());
            }
        }
        this.hiveConfigurator.preload(this.hiveConfig);
        if (SecurityUtil.isSecurityEnabled(this.hiveConfig)) {
            String value = processContext.getProperty(this.kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
            String value2 = processContext.getProperty(this.kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
            String value3 = processContext.getProperty(this.kerberosProperties.getKerberosPassword()).getValue();
            KerberosCredentialsService asControllerService = processContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
            if (asControllerService == null) {
                principal = value;
                keytab = value2;
            } else {
                principal = asControllerService.getPrincipal();
                keytab = asControllerService.getKeytab();
            }
            if (keytab != null) {
                this.kerberosUserReference.set(new KerberosKeytabUser(principal, keytab));
                logger.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{principal, keytab});
            } else {
                if (value3 == null) {
                    throw new ProcessException("Unable to authenticate with Kerberos, no keytab or password was provided");
                }
                this.kerberosUserReference.set(new KerberosPasswordUser(principal, value3));
                logger.info("Hive Security Enabled, logging in as principal {} with password", new Object[]{principal});
            }
            try {
                this.ugi = this.hiveConfigurator.authenticate(this.hiveConfig, this.kerberosUserReference.get());
                logger.info("Successfully logged in as principal " + principal);
            } catch (AuthenticationFailedException e) {
                throw new ProcessException("Kerberos authentication failed for Hive Streaming", e);
            }
        } else {
            this.ugi = null;
            this.kerberosUserReference.set(null);
        }
        this.callTimeout = processContext.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger().intValue() * 1000;
        this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("put-hive-streaming-%d").build());
        this.sendHeartBeat.set(true);
        this.heartBeatTimer = new Timer();
        setupHeartBeatTimer(asInteger.intValue());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ExceptionHandler.OnError<FunctionContext, List<HiveStreamingRecord>> onHiveRecordsError(ProcessContext processContext, ProcessSession processSession, Map<HiveEndPoint, HiveWriter> map) {
        return RollbackOnFailure.createOnError((functionContext, list, result, exc) -> {
            if (result.penalty() == ErrorTypes.Penalty.Yield) {
                processContext.yield();
            }
            switch (AnonymousClass3.$SwitchMap$org$apache$nifi$processor$util$pattern$ErrorTypes$Destination[result.destination().ordinal()]) {
                case 1:
                    getLogger().error(String.format("Error writing %s to Hive Streaming transaction due to %s", list, exc), exc);
                    functionContext.appendRecordsToFailure(processSession, list);
                    return;
                case 2:
                default:
                    abortAndCloseWriters(map);
                    if (!(exc instanceof ProcessException)) {
                        throw new ProcessException(String.format("Error writing %s to Hive Streaming transaction due to %s", list, exc), exc);
                    }
                    throw ((ProcessException) exc);
                case 3:
                    abortAndCloseWriters(map);
                    throw new ShouldRetryException("Hive Streaming connect/write error, flow file will be penalized and routed to retry. " + exc, exc);
                case 4:
                    getLogger().error(String.format("Error writing %s to Hive Streaming transaction due to %s", list, exc), exc);
                    abortAndCloseWriters(map);
                    return;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ExceptionHandler.OnError<FunctionContext, HiveStreamingRecord> onHiveRecordError(ProcessContext processContext, ProcessSession processSession, Map<HiveEndPoint, HiveWriter> map) {
        return (functionContext, hiveStreamingRecord, result, exc) -> {
            onHiveRecordsError(processContext, processSession, map).apply(functionContext, Collections.singletonList(hiveStreamingRecord), result, exc);
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ExceptionHandler.OnError<FunctionContext, GenericRecord> onRecordError(ProcessContext processContext, ProcessSession processSession, Map<HiveEndPoint, HiveWriter> map) {
        return (functionContext, genericRecord, result, exc) -> {
            onHiveRecordError(processContext, processSession, map).apply(functionContext, new HiveStreamingRecord(null, genericRecord), result, exc);
        };
    }

    public void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory) throws ProcessException {
        FunctionContext functionContext = new FunctionContext(processContext.getProperty(ROLLBACK_ON_FAILURE).asBoolean().booleanValue(), getLogger());
        RollbackOnFailure.onTrigger(processContext, processSessionFactory, functionContext, getLogger(), processSession -> {
            onTrigger(processContext, processSession, functionContext);
        });
    }

    private void onTrigger(final ProcessContext processContext, final ProcessSession processSession, final FunctionContext functionContext) throws ProcessException {
        List emptyList;
        String principal;
        String keytab;
        final FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        String value = processContext.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
        String value2 = processContext.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
        Semaphore semaphore = new Semaphore(1);
        Semaphore putIfAbsent = this.tableSemaphoreMap.putIfAbsent(value + "." + value2, semaphore);
        if (putIfAbsent == null) {
            putIfAbsent = semaphore;
        }
        boolean z = false;
        try {
            z = putIfAbsent.tryAcquire(0L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
        }
        if (!z) {
            processSession.rollback();
            return;
        }
        final ComponentLog logger = getLogger();
        String value3 = processContext.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
        boolean booleanValue = processContext.getProperty(AUTOCREATE_PARTITIONS).asBoolean().booleanValue();
        Integer asInteger = processContext.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
        Integer asInteger2 = processContext.getProperty(HEARTBEAT_INTERVAL).evaluateAttributeExpressions().asInteger();
        Integer asInteger3 = processContext.getProperty(TXNS_PER_BATCH).evaluateAttributeExpressions(flowFile).asInteger();
        final Integer asInteger4 = processContext.getProperty(RECORDS_PER_TXN).evaluateAttributeExpressions(flowFile).asInteger();
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        this.threadWriterList.add(concurrentHashMap);
        HiveOptions withCallTimeout = new HiveOptions(value3, value, value2).withTxnsPerBatch(asInteger3).withAutoCreatePartitions(Boolean.valueOf(booleanValue)).withMaxOpenConnections(asInteger).withHeartBeatInterval(asInteger2).withCallTimeout(Integer.valueOf(this.callTimeout));
        if (SecurityUtil.isSecurityEnabled(this.hiveConfig)) {
            String value4 = processContext.getProperty(this.kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
            String value5 = processContext.getProperty(this.kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
            KerberosCredentialsService asControllerService = processContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
            if (asControllerService == null) {
                principal = value4;
                keytab = value5;
            } else {
                principal = asControllerService.getPrincipal();
                keytab = asControllerService.getKeytab();
            }
            withCallTimeout = withCallTimeout.withKerberosPrincipal(principal).withKerberosKeytab(keytab);
        }
        final HiveOptions hiveOptions = withCallTimeout;
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
        String value6 = processContext.getProperty(PARTITION_COLUMNS).evaluateAttributeExpressions().getValue();
        if (value6 == null || value6.isEmpty()) {
            emptyList = Collections.emptyList();
        } else {
            String[] split = value6.split(",");
            emptyList = new ArrayList(split.length);
            for (String str : split) {
                emptyList.add(str.trim());
            }
        }
        final AtomicReference atomicReference = new AtomicReference();
        atomicReference.set(new ArrayList());
        final RoutingResult routingResult = new RoutingResult();
        final ExceptionHandler exceptionHandler = new ExceptionHandler();
        exceptionHandler.mapException(exc -> {
            try {
                try {
                    try {
                        if (exc == null) {
                            return ErrorTypes.PersistentFailure;
                        }
                        throw exc;
                    } catch (Exception e2) {
                        return ErrorTypes.UnknownFailure;
                    }
                } catch (IllegalArgumentException | HiveWriter.WriteFailure | SerializationError e3) {
                    return ErrorTypes.InvalidInput;
                } catch (HiveWriter.CommitFailure | HiveWriter.TxnBatchFailure | HiveWriter.TxnFailure e4) {
                    return ErrorTypes.TemporalInputFailure;
                }
            } catch (IOException | InterruptedException e5) {
                return ErrorTypes.TemporalFailure;
            } catch (ConnectionError | HiveWriter.ConnectFailure e6) {
                logger.error("Error connecting to Hive endpoint: table {} at {}", new Object[]{hiveOptions.getTableName(), hiveOptions.getMetaStoreURI()});
                return ErrorTypes.TemporalFailure;
            }
        });
        final BiFunction createAdjustError = RollbackOnFailure.createAdjustError(getLogger());
        exceptionHandler.adjustError(createAdjustError);
        functionContext.setFlowFiles(processSession.create(flowFile), processSession.create(flowFile));
        try {
            try {
                try {
                    final List list = emptyList;
                    processSession.read(flowFile, new InputStreamCallback() { // from class: org.apache.nifi.processors.hive.PutHiveStreaming.1
                        public void process(InputStream inputStream) throws IOException {
                            try {
                                DataFileStream dataFileStream = new DataFileStream(inputStream, new GenericDatumReader());
                                Throwable th = null;
                                try {
                                    try {
                                        functionContext.initAvroWriters(processSession, dataFileStream.getMetaString("avro.codec") == null ? "null" : dataFileStream.getMetaString("avro.codec"), dataFileStream);
                                        FunctionContext functionContext2 = functionContext;
                                        ProcessSession processSession2 = processSession;
                                        AtomicReference atomicReference2 = atomicReference;
                                        Runnable runnable = () -> {
                                            functionContext2.appendRecordsToSuccess(processSession2, (List) atomicReference2.get());
                                            atomicReference2.set(new ArrayList());
                                        };
                                        while (dataFileStream.hasNext()) {
                                            GenericRecord genericRecord = (GenericRecord) dataFileStream.next();
                                            functionContext.recordCount.incrementAndGet();
                                            ArrayList arrayList = new ArrayList();
                                            ExceptionHandler exceptionHandler2 = exceptionHandler;
                                            FunctionContext functionContext3 = functionContext;
                                            List list2 = list;
                                            if (exceptionHandler2.execute(functionContext3, genericRecord, genericRecord2 -> {
                                                Iterator it = list2.iterator();
                                                while (it.hasNext()) {
                                                    String str2 = (String) it.next();
                                                    Object obj = genericRecord2.get(str2);
                                                    if (obj == null) {
                                                        throw new IllegalArgumentException("Partition column '" + str2 + "' not found in Avro record");
                                                    }
                                                    arrayList.add(obj.toString());
                                                }
                                            }, PutHiveStreaming.this.onRecordError(processContext, processSession, concurrentHashMap))) {
                                                HiveStreamingRecord hiveStreamingRecord = new HiveStreamingRecord(arrayList, genericRecord);
                                                AtomicReference atomicReference3 = new AtomicReference();
                                                ExceptionHandler exceptionHandler3 = exceptionHandler;
                                                FunctionContext functionContext4 = functionContext;
                                                HiveOptions hiveOptions2 = hiveOptions;
                                                Map map = concurrentHashMap;
                                                AtomicReference atomicReference4 = atomicReference;
                                                if (exceptionHandler3.execute(functionContext4, hiveStreamingRecord, hiveStreamingRecord2 -> {
                                                    HiveWriter orCreateWriter = PutHiveStreaming.this.getOrCreateWriter(map, hiveOptions2, PutHiveStreaming.this.makeHiveEndPoint(hiveStreamingRecord.getPartitionValues(), hiveOptions2));
                                                    atomicReference3.set(orCreateWriter);
                                                    orCreateWriter.write(hiveStreamingRecord.getRecord().toString().getBytes(StandardCharsets.UTF_8));
                                                    ((List) atomicReference4.get()).add(hiveStreamingRecord);
                                                }, PutHiveStreaming.this.onHiveRecordError(processContext, processSession, concurrentHashMap))) {
                                                    HiveWriter hiveWriter = (HiveWriter) atomicReference3.get();
                                                    if (hiveWriter.getTotalRecords() >= asInteger4.intValue()) {
                                                        ExceptionHandler exceptionHandler4 = exceptionHandler;
                                                        FunctionContext functionContext5 = functionContext;
                                                        Object obj = atomicReference.get();
                                                        FunctionContext functionContext6 = functionContext;
                                                        exceptionHandler4.execute(functionContext5, obj, list3 -> {
                                                            hiveWriter.flush(true);
                                                            functionContext6.proceed();
                                                            runnable.run();
                                                        }, PutHiveStreaming.this.onHiveRecordsError(processContext, processSession, concurrentHashMap).andThen((functionContext7, list4, result, exc2) -> {
                                                            switch (AnonymousClass3.$SwitchMap$org$apache$nifi$processor$util$pattern$ErrorTypes$Destination[result.destination().ordinal()]) {
                                                                case 1:
                                                                case 3:
                                                                    try {
                                                                        hiveWriter.abort();
                                                                        return;
                                                                    } catch (Exception e2) {
                                                                        throw new ProcessException(e2);
                                                                    }
                                                                default:
                                                                    return;
                                                            }
                                                        }));
                                                    }
                                                }
                                            }
                                        }
                                        ExceptionHandler exceptionHandler5 = exceptionHandler;
                                        FunctionContext functionContext8 = functionContext;
                                        Object obj2 = atomicReference.get();
                                        Map map2 = concurrentHashMap;
                                        exceptionHandler5.execute(functionContext8, obj2, list5 -> {
                                            PutHiveStreaming.this.flushAllWriters(map2, true);
                                            PutHiveStreaming.this.closeAllWriters(map2);
                                            runnable.run();
                                        }, PutHiveStreaming.this.onHiveRecordsError(processContext, processSession, concurrentHashMap));
                                        if (dataFileStream != null) {
                                            if (0 != 0) {
                                                try {
                                                    dataFileStream.close();
                                                } catch (Throwable th2) {
                                                    th.addSuppressed(th2);
                                                }
                                            } else {
                                                dataFileStream.close();
                                            }
                                        }
                                    } finally {
                                    }
                                } catch (Throwable th3) {
                                    th = th3;
                                    throw th3;
                                }
                            } catch (IOException e2) {
                                switch (AnonymousClass3.$SwitchMap$org$apache$nifi$processor$util$pattern$ErrorTypes$Destination[((ErrorTypes.Result) createAdjustError.apply(functionContext, ErrorTypes.InvalidInput)).destination().ordinal()]) {
                                    case 1:
                                        logger.error("The incoming flow file can not be read as an Avro file", e2);
                                        routingResult.routeTo(flowFile, PutHiveStreaming.REL_FAILURE);
                                        return;
                                    case 2:
                                        throw new ProcessException("The incoming flow file can not be read as an Avro file", e2);
                                    default:
                                        return;
                                }
                            }
                        }
                    });
                    if (routingResult.getRoutedFlowFiles().values().stream().noneMatch(list2 -> {
                        return list2.contains(flowFile);
                    })) {
                        processSession.remove(flowFile);
                    }
                    this.threadWriterList.remove(concurrentHashMap);
                    functionContext.transferFlowFiles(processSession, routingResult, hiveOptions);
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    putIfAbsent.release();
                } catch (DiscontinuedException e2) {
                    getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e2}, e2);
                    routingResult.routeTo(flowFile, Relationship.SELF);
                    this.threadWriterList.remove(concurrentHashMap);
                    functionContext.transferFlowFiles(processSession, routingResult, hiveOptions);
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    putIfAbsent.release();
                }
            } catch (ShouldRetryException e3) {
                getLogger().error(e3.getMessage(), e3);
                routingResult.routeTo(processSession.penalize(flowFile), REL_RETRY);
                this.threadWriterList.remove(concurrentHashMap);
                functionContext.transferFlowFiles(processSession, routingResult, hiveOptions);
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                putIfAbsent.release();
            }
        } catch (Throwable th) {
            this.threadWriterList.remove(concurrentHashMap);
            functionContext.transferFlowFiles(processSession, routingResult, hiveOptions);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            putIfAbsent.release();
            throw th;
        }
    }

    @OnStopped
    public void cleanup() {
        this.validationResourceHolder.set(null);
        ComponentLog logger = getLogger();
        this.sendHeartBeat.set(false);
        Iterator<Map<HiveEndPoint, HiveWriter>> it = this.threadWriterList.iterator();
        while (it.hasNext()) {
            Map<HiveEndPoint, HiveWriter> next = it.next();
            for (Map.Entry<HiveEndPoint, HiveWriter> entry : next.entrySet()) {
                try {
                    entry.getValue().flushAndClose();
                } catch (Exception e) {
                    logger.warn("Error while closing writer to " + entry.getKey() + ". Exception follows.", e);
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            next.clear();
        }
        if (this.callTimeoutPool != null) {
            this.callTimeoutPool.shutdown();
            while (!this.callTimeoutPool.isTerminated()) {
                try {
                    this.callTimeoutPool.awaitTermination(this.callTimeout, TimeUnit.MILLISECONDS);
                } catch (Throwable th) {
                    logger.warn("shutdown interrupted on " + this.callTimeoutPool, th);
                }
            }
            this.callTimeoutPool = null;
        }
        this.ugi = null;
        this.kerberosUserReference.set(null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setupHeartBeatTimer(final int i) {
        if (i > 0) {
            final ComponentLog logger = getLogger();
            this.heartBeatTimer.schedule(new TimerTask() { // from class: org.apache.nifi.processors.hive.PutHiveStreaming.2
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    try {
                        if (PutHiveStreaming.this.sendHeartBeat.get()) {
                            logger.debug("Start sending heartbeat on all writers");
                            PutHiveStreaming.this.sendHeartBeatOnAllWriters();
                            PutHiveStreaming.this.setupHeartBeatTimer(i);
                        }
                    } catch (Exception e) {
                        logger.warn("Failed to heartbeat on HiveWriter ", e);
                    }
                }
            }, i * 1000);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendHeartBeatOnAllWriters() throws InterruptedException {
        Iterator<Map<HiveEndPoint, HiveWriter>> it = this.threadWriterList.iterator();
        while (it.hasNext()) {
            Iterator<HiveWriter> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                it2.next().heartBeat();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void flushAllWriters(Map<HiveEndPoint, HiveWriter> map, boolean z) throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
        Iterator<HiveWriter> it = map.values().iterator();
        while (it.hasNext()) {
            it.next().flush(z);
        }
    }

    private void abortAndCloseWriters(Map<HiveEndPoint, HiveWriter> map) {
        try {
            abortAllWriters(map);
            closeAllWriters(map);
        } catch (Exception e) {
            getLogger().warn("unable to close hive connections. ", e);
        }
    }

    private void abortAllWriters(Map<HiveEndPoint, HiveWriter> map) throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure {
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : map.entrySet()) {
            try {
                entry.getValue().abort();
            } catch (Exception e) {
                getLogger().error("Failed to abort hive transaction batch, HiveEndPoint " + entry.getValue() + " due to exception ", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeAllWriters(Map<HiveEndPoint, HiveWriter> map) {
        Iterator<Map.Entry<HiveEndPoint, HiveWriter>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            try {
                it.next().getValue().close();
            } catch (Exception e) {
                getLogger().warn("unable to close writers. ", e);
            }
        }
        map.clear();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public HiveWriter getOrCreateWriter(Map<HiveEndPoint, HiveWriter> map, HiveOptions hiveOptions, HiveEndPoint hiveEndPoint) throws HiveWriter.ConnectFailure, InterruptedException {
        ComponentLog logger = getLogger();
        try {
            HiveWriter hiveWriter = map.get(hiveEndPoint);
            if (hiveWriter == null) {
                logger.debug("Creating Writer to Hive end point : " + hiveEndPoint);
                hiveWriter = makeHiveWriter(hiveEndPoint, this.callTimeoutPool, getUgi(), hiveOptions);
                if (map.size() > hiveOptions.getMaxOpenConnections().intValue() - 1) {
                    logger.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", new Object[]{Integer.valueOf(map.size()), hiveOptions.getMaxOpenConnections()});
                    if (retireIdleWriters(map, hiveOptions.getIdleTimeout().intValue()) == 0) {
                        retireEldestWriter(map);
                    }
                }
                map.put(hiveEndPoint, hiveWriter);
                HiveUtils.logAllHiveEndPoints(map);
            }
            return hiveWriter;
        } catch (HiveWriter.ConnectFailure e) {
            logger.error("Failed to create HiveWriter for endpoint: " + hiveEndPoint, e);
            throw e;
        }
    }

    private void retireEldestWriter(Map<HiveEndPoint, HiveWriter> map) {
        ComponentLog logger = getLogger();
        logger.info("Attempting close eldest writers");
        long currentTimeMillis = System.currentTimeMillis();
        HiveEndPoint hiveEndPoint = null;
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : map.entrySet()) {
            if (entry.getValue().getLastUsed() < currentTimeMillis) {
                hiveEndPoint = entry.getKey();
                currentTimeMillis = entry.getValue().getLastUsed();
            }
        }
        try {
            logger.info("Closing least used Writer to Hive end point : " + hiveEndPoint);
            map.remove(hiveEndPoint).flushAndClose();
        } catch (IOException e) {
            logger.warn("Failed to close writer for end point: " + hiveEndPoint, e);
        } catch (InterruptedException e2) {
            logger.warn("Interrupted when attempting to close writer for end point: " + hiveEndPoint, e2);
            Thread.currentThread().interrupt();
        } catch (Exception e3) {
            logger.warn("Interrupted when attempting to close writer for end point: " + hiveEndPoint, e3);
        }
    }

    private int retireIdleWriters(Map<HiveEndPoint, HiveWriter> map, int i) {
        ComponentLog logger = getLogger();
        logger.info("Attempting to close idle HiveWriters");
        int i2 = 0;
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : map.entrySet()) {
            if (currentTimeMillis - entry.getValue().getLastUsed() > i) {
                i2++;
                arrayList.add(entry.getKey());
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            HiveEndPoint hiveEndPoint = (HiveEndPoint) it.next();
            try {
                logger.info("Closing idle Writer to Hive end point : {}", new Object[]{hiveEndPoint});
                map.remove(hiveEndPoint).flushAndClose();
            } catch (IOException e) {
                logger.warn("Failed to close HiveWriter for end point: {}. Error: " + hiveEndPoint, e);
            } catch (InterruptedException e2) {
                logger.warn("Interrupted when attempting to close HiveWriter for end point: " + hiveEndPoint, e2);
                Thread.currentThread().interrupt();
            } catch (Exception e3) {
                logger.warn("Interrupted when attempting to close HiveWriter for end point: " + hiveEndPoint, e3);
            }
        }
        return i2;
    }

    protected HiveEndPoint makeHiveEndPoint(List<String> list, HiveOptions hiveOptions) throws ConnectionError {
        return HiveUtils.makeEndPoint(list, hiveOptions);
    }

    protected HiveWriter makeHiveWriter(HiveEndPoint hiveEndPoint, ExecutorService executorService, UserGroupInformation userGroupInformation, HiveOptions hiveOptions) throws HiveWriter.ConnectFailure, InterruptedException {
        return HiveUtils.makeHiveWriter(hiveEndPoint, executorService, userGroupInformation, hiveOptions, this.hiveConfig);
    }

    protected KerberosProperties getKerberosProperties() {
        return this.kerberosProperties;
    }

    UserGroupInformation getUgi() {
        getLogger().trace("getting UGI instance");
        if (this.kerberosUserReference.get() != null) {
            KerberosUser kerberosUser = this.kerberosUserReference.get();
            getLogger().debug("kerberosUser is " + kerberosUser);
            try {
                getLogger().debug("checking TGT on kerberosUser [{}]", new Object[]{kerberosUser});
                kerberosUser.checkTGTAndRelogin();
            } catch (LoginException e) {
                throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
            }
        } else {
            getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
        }
        return this.ugi;
    }

    boolean isAllowExplicitKeytab() {
        return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB));
    }

    static {
        try {
            Snappy.compress("");
        } catch (IOException e) {
        }
        HashSet hashSet = new HashSet();
        hashSet.add("avro.schema");
        hashSet.add("avro.codec");
        RESERVED_METADATA = Collections.unmodifiableSet(hashSet);
        METASTORE_URI = new PropertyDescriptor.Builder().name("hive-stream-metastore-uri").displayName("Hive Metastore URI").description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the Hive metastore is 9043.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.URI_VALIDATOR).addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))).build();
        HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder().name("hive-config-resources").displayName("Hive Configuration Resources").description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication with Kerberos e.g., the appropriate properties must be set in the configuration files. Also note that if Max Concurrent Tasks is set to a number greater than one, the 'hcatalog.hive.client.cache.disabled' property will be forced to 'true' to avoid concurrency issues. Please see the Hive documentation for more details.").required(false).addValidator(HiveUtils.createMultipleFilesExistValidator()).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
        DB_NAME = new PropertyDescriptor.Builder().name("hive-stream-database-name").displayName("Database Name").description("The name of the database in which to put the data.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
        TABLE_NAME = new PropertyDescriptor.Builder().name("hive-stream-table-name").displayName("Table Name").description("The name of the database table in which to put the data.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
        PARTITION_COLUMNS = new PropertyDescriptor.Builder().name("hive-stream-partition-cols").displayName("Partition Columns").description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must correspond exactly to the order of partition columns specified during the table creation.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))).build();
        AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder().name("hive-stream-autocreate-partition").displayName("Auto-Create Partitions").description("Flag indicating whether partitions should be automatically created").required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
        MAX_OPEN_CONNECTIONS = new PropertyDescriptor.Builder().name("hive-stream-max-open-connections").displayName("Max Open Connections").description("The maximum number of open connections that can be allocated from this pool at the same time, or negative for no limit.").defaultValue("8").required(true).addValidator(StandardValidators.INTEGER_VALIDATOR).sensitive(false).build();
        HEARTBEAT_INTERVAL = new PropertyDescriptor.Builder().name("hive-stream-heartbeat-interval").displayName("Heartbeat Interval").description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. A value of 0 indicates that no heartbeat should be sent. Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.").defaultValue("60").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
        TXNS_PER_BATCH = new PropertyDescriptor.Builder().name("hive-stream-transactions-per-batch").displayName("Transactions per Batch").description("A hint to Hive Streaming indicating how many transactions the processor task will need. This value must be greater than 1.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(GREATER_THAN_ONE_VALIDATOR).defaultValue("100").build();
        RECORDS_PER_TXN = new PropertyDescriptor.Builder().name("hive-stream-records-per-transaction").displayName("Records per Transaction").description("Number of records to process before committing the transaction. This value must be greater than 1.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(GREATER_THAN_ONE_VALIDATOR).defaultValue("10000").build();
        CALL_TIMEOUT = new PropertyDescriptor.Builder().name("hive-stream-call-timeout").displayName("Call Timeout").description("The number of seconds allowed for a Hive Streaming operation to complete. A value of 0 indicates the processor should wait indefinitely on operations. Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.").defaultValue("0").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
        ROLLBACK_ON_FAILURE = RollbackOnFailure.createRollbackOnFailureProperty("NOTE: When an error occurred after a Hive streaming transaction which is derived from the same input FlowFile is already committed, (i.e. a FlowFile contains more records than 'Records per Transaction' and a failure occurred at the 2nd transaction or later) then the succeeded records will be transferred to 'success' relationship while the original input FlowFile stays in incoming queue. Duplicated records can be created for the succeeded ones when the same FlowFile is processed again.");
        KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder().name("kerberos-credentials-service").displayName("Kerberos Credentials Service").description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos").identifiesControllerService(KerberosCredentialsService.class).required(false).build();
        REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile containing Avro records routed to this relationship after the record has been successfully transmitted to Hive.").build();
        REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile containing Avro records routed to this relationship if the record could not be transmitted to Hive.").build();
        REL_RETRY = new Relationship.Builder().name("retry").description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that some records may have been processed successfully, they will be routed (as Avro flow files) to the success relationship. The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This can be used to provide a retry capability since full rollback is not possible.").build();
    }
}
