package org.apache.nifi.processors.hive;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.ConnectionError;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.SerializationError;
import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.hive.AuthenticationFailedException;
import org.apache.nifi.util.hive.HiveConfigurator;
import org.apache.nifi.util.hive.HiveOptions;
import org.apache.nifi.util.hive.HiveUtils;
import org.apache.nifi.util.hive.HiveWriter;
import org.json.JSONException;
import org.json.JSONObject;

@CapabilityDescription("This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor.")
@TriggerSerially
@Tags({"hive", "streaming", "put", "database", "store"})
@WritesAttributes({@WritesAttribute(attribute = PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR, description = "This attribute is written on the flow files routed to the 'success' and 'failure' relationships, and contains the number of records from the incoming flow file written successfully and unsuccessfully, respectively.")})
/* loaded from: input_file:org/apache/nifi/processors/hive/PutHiveStreaming.class */
public class PutHiveStreaming extends AbstractProcessor {
    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    private static final Validator GREATER_THAN_ONE_VALIDATOR = (str, str2, validationContext) -> {
        if (validationContext.isExpressionLanguageSupported(str) && validationContext.isExpressionLanguagePresent(str2)) {
            return new ValidationResult.Builder().subject(str).input(str2).explanation("Expression Language Present").valid(true).build();
        }
        String str = null;
        try {
            if (Integer.parseInt(str2) < 2) {
                str = "value is less than 2";
            }
        } catch (NumberFormatException e) {
            str = "value is not a valid integer";
        }
        return new ValidationResult.Builder().subject(str).input(str2).explanation(str).valid(str == null).build();
    };
    private static final Set<String> RESERVED_METADATA;
    public static final PropertyDescriptor METASTORE_URI;
    public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES;
    public static final PropertyDescriptor DB_NAME;
    public static final PropertyDescriptor TABLE_NAME;
    public static final PropertyDescriptor PARTITION_COLUMNS;
    public static final PropertyDescriptor AUTOCREATE_PARTITIONS;
    public static final PropertyDescriptor MAX_OPEN_CONNECTIONS;
    public static final PropertyDescriptor HEARTBEAT_INTERVAL;
    public static final PropertyDescriptor TXNS_PER_BATCH;
    public static final Relationship REL_SUCCESS;
    public static final Relationship REL_FAILURE;
    public static final Relationship REL_RETRY;
    private List<PropertyDescriptor> propertyDescriptors;
    private Set<Relationship> relationships;
    private static final long TICKET_RENEWAL_PERIOD = 60000;
    protected KerberosProperties kerberosProperties;
    protected volatile UserGroupInformation ugi;
    protected HiveOptions options;
    protected ExecutorService callTimeoutPool;
    protected transient Timer heartBeatTimer;
    protected Map<HiveEndPoint, HiveWriter> allWriters;
    private volatile File kerberosConfigFile = null;
    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
    protected AtomicBoolean sendHeartBeat = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/nifi/processors/hive/PutHiveStreaming$HiveStreamingRecord.class */
    public class HiveStreamingRecord {
        private List<String> partitionValues;
        private GenericRecord record;

        public HiveStreamingRecord(List<String> list, GenericRecord genericRecord) {
            this.partitionValues = list;
            this.record = genericRecord;
        }

        public List<String> getPartitionValues() {
            return this.partitionValues;
        }

        public GenericRecord getRecord() {
            return this.record;
        }
    }

    protected void init(ProcessorInitializationContext processorInitializationContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(METASTORE_URI);
        arrayList.add(HIVE_CONFIGURATION_RESOURCES);
        arrayList.add(DB_NAME);
        arrayList.add(TABLE_NAME);
        arrayList.add(PARTITION_COLUMNS);
        arrayList.add(AUTOCREATE_PARTITIONS);
        arrayList.add(MAX_OPEN_CONNECTIONS);
        arrayList.add(HEARTBEAT_INTERVAL);
        arrayList.add(TXNS_PER_BATCH);
        this.kerberosConfigFile = processorInitializationContext.getKerberosConfigurationFile();
        this.kerberosProperties = new KerberosProperties(this.kerberosConfigFile);
        arrayList.add(this.kerberosProperties.getKerberosPrincipal());
        arrayList.add(this.kerberosProperties.getKerberosKeytab());
        this.propertyDescriptors = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.add(REL_FAILURE);
        hashSet.add(REL_RETRY);
        this.relationships = Collections.unmodifiableSet(hashSet);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.propertyDescriptors;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @OnScheduled
    public void setup(ProcessContext processContext) {
        ComponentLog logger = getLogger();
        String value = processContext.getProperty(METASTORE_URI).getValue();
        String value2 = processContext.getProperty(DB_NAME).getValue();
        String value3 = processContext.getProperty(TABLE_NAME).getValue();
        boolean booleanValue = processContext.getProperty(AUTOCREATE_PARTITIONS).asBoolean().booleanValue();
        Integer asInteger = processContext.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
        Integer asInteger2 = processContext.getProperty(HEARTBEAT_INTERVAL).asInteger();
        Integer asInteger3 = processContext.getProperty(TXNS_PER_BATCH).asInteger();
        Configuration configurationFromFiles = this.hiveConfigurator.getConfigurationFromFiles(processContext.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue());
        for (Map.Entry entry : processContext.getProperties().entrySet()) {
            PropertyDescriptor propertyDescriptor = (PropertyDescriptor) entry.getKey();
            if (propertyDescriptor.isDynamic()) {
                configurationFromFiles.set(propertyDescriptor.getName(), (String) entry.getValue());
            }
        }
        this.options = new HiveOptions(value, value2, value3).withTxnsPerBatch(asInteger3).withAutoCreatePartitions(Boolean.valueOf(booleanValue)).withMaxOpenConnections(asInteger).withHeartBeatInterval(asInteger2);
        this.hiveConfigurator.preload(configurationFromFiles);
        if (SecurityUtil.isSecurityEnabled(configurationFromFiles)) {
            String value4 = processContext.getProperty(this.kerberosProperties.getKerberosPrincipal()).getValue();
            String value5 = processContext.getProperty(this.kerberosProperties.getKerberosKeytab()).getValue();
            logger.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{value4, value5});
            try {
                this.ugi = this.hiveConfigurator.authenticate(configurationFromFiles, value4, value5, TICKET_RENEWAL_PERIOD, logger);
                logger.info("Successfully logged in as principal {} with keytab {}", new Object[]{value4, value5});
                this.options = this.options.withKerberosPrincipal(value4).withKerberosKeytab(value5);
            } catch (AuthenticationFailedException e) {
                throw new ProcessException("Kerberos authentication failed for Hive Streaming", e);
            }
        }
        this.allWriters = new ConcurrentHashMap();
        this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("put-hive-streaming-%d").build());
        this.sendHeartBeat.set(true);
        this.heartBeatTimer = new Timer();
        setupHeartBeatTimer();
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        List emptyList;
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        ComponentLog logger = getLogger();
        Integer asInteger = processContext.getProperty(TXNS_PER_BATCH).asInteger();
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
        String value = processContext.getProperty(PARTITION_COLUMNS).getValue();
        if (value == null || value.isEmpty()) {
            emptyList = Collections.emptyList();
        } else {
            String[] split = value.split(",");
            emptyList = new ArrayList(split.length);
            for (String str : split) {
                emptyList.add(str.trim());
            }
        }
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        LinkedList linkedList = new LinkedList();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicReference atomicReference = new AtomicReference(processSession.create(flowFile));
        DataFileWriter dataFileWriter = new DataFileWriter(new GenericDatumWriter());
        AtomicReference atomicReference2 = new AtomicReference(processSession.create(flowFile));
        DataFileWriter dataFileWriter2 = new DataFileWriter(new GenericDatumWriter());
        try {
            try {
                List list = emptyList;
                processSession.read(flowFile, inputStream -> {
                    try {
                        try {
                            DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(inputStream, new GenericDatumReader());
                            Throwable th = null;
                            GenericRecord genericRecord = null;
                            String metaString = dataFileStream.getMetaString("avro.codec") == null ? "null" : dataFileStream.getMetaString("avro.codec");
                            Arrays.asList(dataFileWriter, dataFileWriter2).forEach(dataFileWriter3 -> {
                                dataFileWriter3.setCodec(CodecFactory.fromString(metaString));
                                for (String str2 : dataFileStream.getMetaKeys()) {
                                    if (!RESERVED_METADATA.contains(str2)) {
                                        dataFileWriter3.setMeta(str2, dataFileStream.getMeta(str2));
                                    }
                                }
                            });
                            while (dataFileStream.hasNext()) {
                                genericRecord = (GenericRecord) dataFileStream.next(genericRecord);
                                atomicInteger.incrementAndGet();
                                ArrayList arrayList = new ArrayList();
                                try {
                                    for (String str2 : list) {
                                        Object obj = genericRecord.get(str2);
                                        if (obj == null) {
                                            throw new IOException("Partition column '" + str2 + "' not found in Avro record");
                                            break;
                                        }
                                        arrayList.add(obj.toString());
                                    }
                                    List fields = genericRecord.getSchema().getFields();
                                    if (fields != null) {
                                        JSONObject jSONObject = new JSONObject();
                                        try {
                                            Iterator it = fields.iterator();
                                            while (it.hasNext()) {
                                                String name = ((Schema.Field) it.next()).name();
                                                if (!list.contains(name)) {
                                                    try {
                                                        jSONObject.put(name, genericRecord.get(name));
                                                    } catch (JSONException e) {
                                                        throw new IOException(e);
                                                        break;
                                                    }
                                                }
                                            }
                                            HiveStreamingRecord hiveStreamingRecord = new HiveStreamingRecord(arrayList, genericRecord);
                                            try {
                                                HiveWriter orCreateWriter = getOrCreateWriter(makeHiveEndPoint(hiveStreamingRecord.getPartitionValues(), this.options));
                                                try {
                                                    try {
                                                        orCreateWriter.write(hiveStreamingRecord.getRecord().toString().getBytes(StandardCharsets.UTF_8));
                                                        linkedList.add(hiveStreamingRecord);
                                                    } catch (InterruptedException | HiveWriter.WriteFailure e2) {
                                                        logger.error("Error writing record to Hive Streaming transaction", e2);
                                                        appendRecordsToFlowFile(processSession, Collections.singletonList(hiveStreamingRecord), atomicReference2, dataFileWriter2, dataFileStream);
                                                    }
                                                    if (orCreateWriter.getTotalRecords() >= asInteger.intValue()) {
                                                        orCreateWriter.flush(true);
                                                        try {
                                                            appendRecordsToFlowFile(processSession, linkedList, atomicReference, dataFileWriter, dataFileStream);
                                                            atomicInteger2.accumulateAndGet(linkedList.size(), (i, i2) -> {
                                                                return i + i2;
                                                            });
                                                            linkedList.clear();
                                                        } catch (IOException e3) {
                                                            getLogger().error("Error writing Avro records (which were sent successfully to Hive Streaming) to the flow file", e3);
                                                        }
                                                    }
                                                } catch (InterruptedException | HiveWriter.CommitFailure | HiveWriter.TxnBatchFailure | HiveWriter.TxnFailure | SerializationError e4) {
                                                    logger.error("Error writing record to Hive Streaming transaction", e4);
                                                    appendRecordsToFlowFile(processSession, Collections.singletonList(hiveStreamingRecord), atomicReference2, dataFileWriter2, dataFileStream);
                                                    if (!(e4 instanceof SerializationError)) {
                                                        try {
                                                            orCreateWriter.abort();
                                                        } catch (Exception e5) {
                                                            throw new ProcessException(e5);
                                                        }
                                                    }
                                                }
                                            } catch (ConnectionError | InterruptedException | HiveWriter.ConnectFailure e6) {
                                                logger.error("Error connecting to Hive endpoint: table {} at {}", new Object[]{this.options.getTableName(), this.options.getMetaStoreURI()});
                                                abortAndCloseWriters();
                                                throw new ProcessException(e6);
                                            }
                                        } catch (IOException e7) {
                                            logger.error("Error writing record to Hive Streaming transaction", e7);
                                            appendRecordsToFlowFile(processSession, Collections.singletonList(new HiveStreamingRecord(null, genericRecord)), atomicReference2, dataFileWriter2, dataFileStream);
                                        }
                                    }
                                } catch (IOException e8) {
                                    logger.error("Error writing record to Hive Streaming transaction", e8);
                                    appendRecordsToFlowFile(processSession, Collections.singletonList(new HiveStreamingRecord(null, genericRecord)), atomicReference2, dataFileWriter2, dataFileStream);
                                }
                            }
                            try {
                                flushAllWriters(true);
                                closeAllWriters();
                                appendRecordsToFlowFile(processSession, linkedList, atomicReference, dataFileWriter, dataFileStream);
                                atomicInteger2.accumulateAndGet(linkedList.size(), (i3, i4) -> {
                                    return i3 + i4;
                                });
                                linkedList.clear();
                            } catch (InterruptedException | HiveWriter.CommitFailure | HiveWriter.TxnBatchFailure | HiveWriter.TxnFailure e9) {
                                appendRecordsToFlowFile(processSession, linkedList, atomicReference2, dataFileWriter2, dataFileStream);
                            }
                            if (dataFileStream != null) {
                                if (0 != 0) {
                                    try {
                                        dataFileStream.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    dataFileStream.close();
                                }
                            }
                        } finally {
                        }
                    } catch (IOException e10) {
                        logger.error("The incoming flow file can not be read as an Avro file, routing to failure", e10);
                        atomicBoolean.set(true);
                    }
                });
                if (atomicInteger.get() > 0) {
                    if (atomicInteger2.get() > 0) {
                        atomicReference.set(processSession.putAttribute((FlowFile) atomicReference.get(), HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(atomicInteger.get())));
                        processSession.getProvenanceReporter().send((FlowFile) atomicReference.get(), this.options.getMetaStoreURI());
                        processSession.transfer((FlowFile) atomicReference.get(), REL_SUCCESS);
                    } else {
                        processSession.remove((FlowFile) atomicReference.get());
                    }
                    if (atomicInteger.get() != atomicInteger2.get()) {
                        atomicReference2.set(processSession.putAttribute((FlowFile) atomicReference2.get(), HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(atomicInteger.get() - atomicInteger2.get())));
                        processSession.transfer((FlowFile) atomicReference2.get(), REL_FAILURE);
                    } else {
                        processSession.remove((FlowFile) atomicReference2.get());
                    }
                } else {
                    processSession.remove((FlowFile) atomicReference.get());
                    processSession.remove((FlowFile) atomicReference2.get());
                }
                atomicReference.set(null);
                atomicReference2.set(null);
                if (atomicBoolean.get()) {
                    processSession.transfer(flowFile, REL_FAILURE);
                } else {
                    processSession.remove(flowFile);
                }
                Thread.currentThread().setContextClassLoader(contextClassLoader);
            } catch (ProcessException e) {
                abortAndCloseWriters();
                Throwable cause = e.getCause();
                if (cause == null) {
                    throw e;
                }
                if (!(cause instanceof ConnectionError) && !(cause instanceof HiveWriter.ConnectFailure) && !(cause instanceof HiveWriter.CommitFailure) && !(cause instanceof HiveWriter.TxnBatchFailure) && !(cause instanceof HiveWriter.TxnFailure) && !(cause instanceof InterruptedException)) {
                    throw e;
                }
                logger.error("Hive Streaming connect/write error, flow file will be penalized and routed to retry", cause);
                processSession.transfer(processSession.penalize(flowFile), REL_RETRY);
                if (atomicReference.get() != null) {
                    processSession.remove((FlowFile) atomicReference.get());
                }
                if (atomicReference2.get() != null) {
                    processSession.remove((FlowFile) atomicReference2.get());
                }
                Thread.currentThread().setContextClassLoader(contextClassLoader);
            }
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    private void appendRecordsToFlowFile(ProcessSession processSession, List<HiveStreamingRecord> list, AtomicReference<FlowFile> atomicReference, DataFileWriter<GenericRecord> dataFileWriter, DataFileStream<GenericRecord> dataFileStream) throws IOException {
        atomicReference.set(processSession.append(atomicReference.get(), outputStream -> {
            DataFileWriter create = dataFileWriter.create(dataFileStream.getSchema(), outputStream);
            Throwable th = null;
            try {
                try {
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        create.append(((HiveStreamingRecord) it.next()).getRecord());
                    }
                    create.flush();
                    if (create != null) {
                        if (0 == 0) {
                            create.close();
                            return;
                        }
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (create != null) {
                    if (th != null) {
                        try {
                            create.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th4;
            }
        }));
    }

    @OnStopped
    public void cleanup() {
        ComponentLog logger = getLogger();
        this.sendHeartBeat.set(false);
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            try {
                entry.getValue().flushAndClose();
            } catch (Exception e) {
                logger.warn("Error while closing writer to " + entry.getKey() + ". Exception follows.", e);
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        this.callTimeoutPool.shutdown();
        while (!this.callTimeoutPool.isTerminated()) {
            try {
                this.callTimeoutPool.awaitTermination(this.options.getCallTimeOut().intValue(), TimeUnit.MILLISECONDS);
            } catch (Throwable th) {
                logger.warn("shutdown interrupted on " + this.callTimeoutPool, th);
            }
        }
        this.callTimeoutPool = null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setupHeartBeatTimer() {
        if (this.options.getHeartBeatInterval().intValue() > 0) {
            final ComponentLog logger = getLogger();
            this.heartBeatTimer.schedule(new TimerTask() { // from class: org.apache.nifi.processors.hive.PutHiveStreaming.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    try {
                        if (PutHiveStreaming.this.sendHeartBeat.get()) {
                            logger.debug("Start sending heartbeat on all writers");
                            PutHiveStreaming.this.sendHeartBeatOnAllWriters();
                            PutHiveStreaming.this.setupHeartBeatTimer();
                        }
                    } catch (Exception e) {
                        logger.warn("Failed to heartbeat on HiveWriter ", e);
                    }
                }
            }, this.options.getHeartBeatInterval().intValue() * 1000);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendHeartBeatOnAllWriters() throws InterruptedException {
        Iterator<HiveWriter> it = this.allWriters.values().iterator();
        while (it.hasNext()) {
            it.next().heartBeat();
        }
    }

    private void flushAllWriters(boolean z) throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
        Iterator<HiveWriter> it = this.allWriters.values().iterator();
        while (it.hasNext()) {
            it.next().flush(z);
        }
    }

    private void abortAndCloseWriters() {
        try {
            abortAllWriters();
            closeAllWriters();
        } catch (Exception e) {
            getLogger().warn("unable to close hive connections. ", e);
        }
    }

    private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure {
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            try {
                entry.getValue().abort();
            } catch (Exception e) {
                getLogger().error("Failed to abort hive transaction batch, HiveEndPoint " + entry.getValue() + " due to exception ", e);
            }
        }
    }

    private void closeAllWriters() {
        Iterator<Map.Entry<HiveEndPoint, HiveWriter>> it = this.allWriters.entrySet().iterator();
        while (it.hasNext()) {
            try {
                it.next().getValue().close();
            } catch (Exception e) {
                getLogger().warn("unable to close writers. ", e);
            }
        }
        this.allWriters.clear();
    }

    private HiveWriter getOrCreateWriter(HiveEndPoint hiveEndPoint) throws HiveWriter.ConnectFailure, InterruptedException {
        ComponentLog logger = getLogger();
        try {
            HiveWriter hiveWriter = this.allWriters.get(hiveEndPoint);
            if (hiveWriter == null) {
                logger.debug("Creating Writer to Hive end point : " + hiveEndPoint);
                hiveWriter = makeHiveWriter(hiveEndPoint, this.callTimeoutPool, this.ugi, this.options);
                if (this.allWriters.size() > this.options.getMaxOpenConnections().intValue() - 1) {
                    logger.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", new Object[]{Integer.valueOf(this.allWriters.size()), this.options.getMaxOpenConnections()});
                    if (retireIdleWriters() == 0) {
                        retireEldestWriter();
                    }
                }
                this.allWriters.put(hiveEndPoint, hiveWriter);
                HiveUtils.logAllHiveEndPoints(this.allWriters);
            }
            return hiveWriter;
        } catch (HiveWriter.ConnectFailure e) {
            logger.error("Failed to create HiveWriter for endpoint: " + hiveEndPoint, e);
            throw e;
        }
    }

    private void retireEldestWriter() {
        ComponentLog logger = getLogger();
        logger.info("Attempting close eldest writers");
        long currentTimeMillis = System.currentTimeMillis();
        HiveEndPoint hiveEndPoint = null;
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            if (entry.getValue().getLastUsed() < currentTimeMillis) {
                hiveEndPoint = entry.getKey();
                currentTimeMillis = entry.getValue().getLastUsed();
            }
        }
        try {
            logger.info("Closing least used Writer to Hive end point : " + hiveEndPoint);
            this.allWriters.remove(hiveEndPoint).flushAndClose();
        } catch (IOException e) {
            logger.warn("Failed to close writer for end point: " + hiveEndPoint, e);
        } catch (InterruptedException e2) {
            logger.warn("Interrupted when attempting to close writer for end point: " + hiveEndPoint, e2);
            Thread.currentThread().interrupt();
        } catch (Exception e3) {
            logger.warn("Interrupted when attempting to close writer for end point: " + hiveEndPoint, e3);
        }
    }

    private int retireIdleWriters() {
        ComponentLog logger = getLogger();
        logger.info("Attempting to close idle HiveWriters");
        int i = 0;
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : this.allWriters.entrySet()) {
            if (currentTimeMillis - entry.getValue().getLastUsed() > this.options.getIdleTimeout().intValue()) {
                i++;
                arrayList.add(entry.getKey());
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            HiveEndPoint hiveEndPoint = (HiveEndPoint) it.next();
            try {
                logger.info("Closing idle Writer to Hive end point : {}", new Object[]{hiveEndPoint});
                this.allWriters.remove(hiveEndPoint).flushAndClose();
            } catch (IOException e) {
                logger.warn("Failed to close HiveWriter for end point: {}. Error: " + hiveEndPoint, e);
            } catch (InterruptedException e2) {
                logger.warn("Interrupted when attempting to close HiveWriter for end point: " + hiveEndPoint, e2);
                Thread.currentThread().interrupt();
            } catch (Exception e3) {
                logger.warn("Interrupted when attempting to close HiveWriter for end point: " + hiveEndPoint, e3);
            }
        }
        return i;
    }

    protected HiveEndPoint makeHiveEndPoint(List<String> list, HiveOptions hiveOptions) throws ConnectionError {
        return HiveUtils.makeEndPoint(list, hiveOptions);
    }

    protected HiveWriter makeHiveWriter(HiveEndPoint hiveEndPoint, ExecutorService executorService, UserGroupInformation userGroupInformation, HiveOptions hiveOptions) throws HiveWriter.ConnectFailure, InterruptedException {
        return HiveUtils.makeHiveWriter(hiveEndPoint, executorService, userGroupInformation, hiveOptions);
    }

    protected KerberosProperties getKerberosProperties() {
        return this.kerberosProperties;
    }

    static {
        HashSet hashSet = new HashSet();
        hashSet.add("avro.schema");
        hashSet.add("avro.codec");
        RESERVED_METADATA = Collections.unmodifiableSet(hashSet);
        METASTORE_URI = new PropertyDescriptor.Builder().name("hive-stream-metastore-uri").displayName("Hive Metastore URI").description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the Hive metastore is 9043.").required(true).addValidator(StandardValidators.URI_VALIDATOR).addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))).build();
        HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder().name("hive-config-resources").displayName("Hive Configuration Resources").description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication with Kerberos e.g., the appropriate properties must be set in the configuration files. Please see the Hive documentation for more details.").required(false).addValidator(HiveUtils.createMultipleFilesExistValidator()).build();
        DB_NAME = new PropertyDescriptor.Builder().name("hive-stream-database-name").displayName("Database Name").description("The name of the database in which to put the data.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
        TABLE_NAME = new PropertyDescriptor.Builder().name("hive-stream-table-name").displayName("Table Name").description("The name of the database table in which to put the data.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
        PARTITION_COLUMNS = new PropertyDescriptor.Builder().name("hive-stream-partition-cols").displayName("Partition Columns").description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must correspond exactly to the order of partition columns specified during the table creation.").required(false).expressionLanguageSupported(false).addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("[^,]+(,[^,]+)*"))).build();
        AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder().name("hive-stream-autocreate-partition").displayName("Auto-Create Partitions").description("Flag indicating whether partitions should be automatically created").required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
        MAX_OPEN_CONNECTIONS = new PropertyDescriptor.Builder().name("hive-stream-max-open-connections").displayName("Max Open Connections").description("The maximum number of open connections that can be allocated from this pool at the same time, or negative for no limit.").defaultValue("8").required(true).addValidator(StandardValidators.INTEGER_VALIDATOR).sensitive(false).build();
        HEARTBEAT_INTERVAL = new PropertyDescriptor.Builder().name("hive-stream-heartbeat-interval").displayName("Heartbeat Interval").description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. A value of 0 indicates that no heartbeat should be sent.").defaultValue("60").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).sensitive(false).build();
        TXNS_PER_BATCH = new PropertyDescriptor.Builder().name("hive-stream-transactions-per-batch").displayName("Transactions per Batch").description("A hint to Hive Streaming indicating how many transactions the processor task will need. This value must be greater than 1.").required(true).expressionLanguageSupported(true).addValidator(GREATER_THAN_ONE_VALIDATOR).defaultValue("100").build();
        REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile containing the JSON contents of a record is routed to this relationship after the record has been successfully transmitted to Hive.").build();
        REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile containing the JSON contents of a record is routed to this relationship if the record could not be transmitted to Hive.").build();
        REL_RETRY = new Relationship.Builder().name("retry").description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that some records may have been processed successfully, they will be routed (as JSON flow files) to the success relationship. The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This can be used to provide a retry capability since full rollback is not possible.").build();
    }
}
