package org.apache.nifi.processors.hadoop;

import com.google.common.base.Throwables;
import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedAction;
import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
import org.ietf.jgss.GSSException;

/* loaded from: input_file:org/apache/nifi/processors/hadoop/AbstractPutHDFS.class */
public abstract class AbstractPutHDFS extends AbstractHadoopProcessor {
    protected static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
    protected static final int BUFFER_SIZE_DEFAULT = 4096;
    protected static final String REPLACE_RESOLUTION = "replace";
    protected static final AllowableValue REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION, REPLACE_RESOLUTION, "Replaces the existing file if any.");
    protected static final String IGNORE_RESOLUTION = "ignore";
    protected static final AllowableValue IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION, "Ignores the flow file and routes it to success.");
    protected static final String FAIL_RESOLUTION = "fail";
    protected static final AllowableValue FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION, "Penalizes the flow file and routes it to failure.");
    protected static final String APPEND_RESOLUTION = "append";
    protected static final AllowableValue APPEND_RESOLUTION_AV = new AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION, "Appends to the existing file if any, creates a new file otherwise.");
    protected static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder().name("Conflict Resolution Strategy").description("Indicates what should happen when a file with the same name already exists in the output directory").required(true).defaultValue(FAIL_RESOLUTION_AV.getValue()).allowableValues(new AllowableValue[]{REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV}).build();

    public void onTrigger(final ProcessContext processContext, final ProcessSession processSession) throws ProcessException {
        final FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        final FileSystem fileSystem = getFileSystem();
        final Configuration configuration = getConfiguration();
        UserGroupInformation userGroupInformation = getUserGroupInformation();
        if (configuration != null && fileSystem != null && userGroupInformation != null) {
            userGroupInformation.doAs(new PrivilegedAction<Object>() { // from class: org.apache.nifi.processors.hadoop.AbstractPutHDFS.1
                @Override // java.security.PrivilegedAction
                public Object run() {
                    FlowFile flowFile2 = flowFile;
                    try {
                        Path normalizedPath = AbstractPutHDFS.this.getNormalizedPath(processContext, AbstractHadoopProcessor.DIRECTORY, flowFile2);
                        final String value = processContext.getProperty(AbstractPutHDFS.CONFLICT_RESOLUTION).getValue();
                        final long blockSize = AbstractPutHDFS.this.getBlockSize(processContext, processSession, flowFile2, normalizedPath);
                        final int bufferSize = AbstractPutHDFS.this.getBufferSize(processContext, processSession, flowFile2);
                        final short replication = AbstractPutHDFS.this.getReplication(processContext, processSession, flowFile2, normalizedPath);
                        final CompressionCodec compressionCodec = AbstractPutHDFS.this.getCompressionCodec(processContext, configuration);
                        String attribute = compressionCodec != null ? flowFile2.getAttribute(CoreAttributes.FILENAME.key()) + compressionCodec.getDefaultExtension() : flowFile2.getAttribute(CoreAttributes.FILENAME.key());
                        final Path path = new Path(normalizedPath, "." + attribute);
                        final Path path2 = new Path(normalizedPath, attribute);
                        try {
                        } catch (FileNotFoundException e) {
                            if (!fileSystem.mkdirs(normalizedPath)) {
                                throw new IOException(normalizedPath.toString() + " could not be created");
                            }
                            AbstractPutHDFS.this.changeOwner(processContext, fileSystem, normalizedPath, flowFile);
                        }
                        if (!fileSystem.getFileStatus(normalizedPath).isDirectory()) {
                            throw new IOException(normalizedPath.toString() + " already exists and is not a directory");
                        }
                        final boolean exists = fileSystem.exists(path2);
                        if (exists) {
                            boolean z = -1;
                            switch (value.hashCode()) {
                                case -1190396462:
                                    if (value.equals(AbstractPutHDFS.IGNORE_RESOLUTION)) {
                                        z = true;
                                        break;
                                    }
                                    break;
                                case 3135262:
                                    if (value.equals(AbstractPutHDFS.FAIL_RESOLUTION)) {
                                        z = 2;
                                        break;
                                    }
                                    break;
                                case 1094496948:
                                    if (value.equals(AbstractPutHDFS.REPLACE_RESOLUTION)) {
                                        z = false;
                                        break;
                                    }
                                    break;
                            }
                            switch (z) {
                                case false:
                                    if (fileSystem.delete(path2, false)) {
                                        AbstractPutHDFS.this.getLogger().info("deleted {} in order to replace with the contents of {}", new Object[]{path2, flowFile2});
                                        break;
                                    }
                                    break;
                                case true:
                                    processSession.transfer(flowFile2, AbstractPutHDFS.this.getSuccessRelationship());
                                    AbstractPutHDFS.this.getLogger().info("transferring {} to success because file with same name already exists", new Object[]{flowFile2});
                                    return null;
                                case true:
                                    processSession.transfer(processSession.penalize(flowFile2), AbstractPutHDFS.this.getFailureRelationship());
                                    AbstractPutHDFS.this.getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{flowFile2});
                                    return null;
                            }
                        }
                        StopWatch stopWatch = new StopWatch(true);
                        processSession.read(flowFile2, new InputStreamCallback() { // from class: org.apache.nifi.processors.hadoop.AbstractPutHDFS.1.1
                            /* JADX WARN: Finally extract failed */
                            public void process(InputStream inputStream) throws IOException {
                                FSDataOutputStream fSDataOutputStream = null;
                                Path path3 = null;
                                try {
                                    if (value.equals(AbstractPutHDFS.APPEND_RESOLUTION) && exists) {
                                        fSDataOutputStream = fileSystem.append(path2, bufferSize);
                                    } else {
                                        EnumSet of = EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
                                        if (AbstractPutHDFS.this.shouldIgnoreLocality(processContext, processSession)) {
                                            of.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
                                        }
                                        fSDataOutputStream = fileSystem.create(path, FsCreateModes.applyUMask(FsPermission.getFileDefault(), FsPermission.getUMask(fileSystem.getConf())), of, bufferSize, replication, blockSize, (Progressable) null, (Options.ChecksumOpt) null);
                                    }
                                    if (compressionCodec != null) {
                                        fSDataOutputStream = compressionCodec.createOutputStream(fSDataOutputStream);
                                    }
                                    path3 = path;
                                    StreamUtils.copy(new BufferedInputStream(inputStream), fSDataOutputStream);
                                    fSDataOutputStream.flush();
                                    if (fSDataOutputStream != null) {
                                        try {
                                            fSDataOutputStream.close();
                                        } catch (Throwable th) {
                                            if (path3 != null) {
                                                try {
                                                    fileSystem.delete(path3, false);
                                                } catch (Throwable th2) {
                                                }
                                            }
                                            throw th;
                                        }
                                    }
                                } catch (Throwable th3) {
                                    if (fSDataOutputStream != null) {
                                        try {
                                            fSDataOutputStream.close();
                                        } catch (Throwable th4) {
                                            if (path3 != null) {
                                                try {
                                                    fileSystem.delete(path3, false);
                                                } catch (Throwable th5) {
                                                }
                                            }
                                            throw th4;
                                        }
                                    }
                                    throw th3;
                                }
                            }
                        });
                        stopWatch.stop();
                        String calculateDataRate = stopWatch.calculateDataRate(flowFile2.getSize());
                        long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                        if (!value.equals(AbstractPutHDFS.APPEND_RESOLUTION) || (value.equals(AbstractPutHDFS.APPEND_RESOLUTION) && !exists)) {
                            boolean z2 = false;
                            int i = 0;
                            while (true) {
                                if (i < 10) {
                                    if (fileSystem.rename(path, path2)) {
                                        z2 = true;
                                    } else {
                                        Thread.sleep(200L);
                                        i++;
                                    }
                                }
                            }
                            if (!z2) {
                                fileSystem.delete(path, false);
                                throw new ProcessException("Copied file to HDFS but could not rename dot file " + path + " to its final filename");
                            }
                            AbstractPutHDFS.this.changeOwner(processContext, fileSystem, path2, flowFile);
                        }
                        AbstractPutHDFS.this.getLogger().info("copied {} to HDFS at {} in {} milliseconds at a rate of {}", new Object[]{flowFile2, path2, Long.valueOf(duration), calculateDataRate});
                        FlowFile putAttribute = processSession.putAttribute(processSession.putAttribute(flowFile2, CoreAttributes.FILENAME.key(), path2.getName()), AbstractHadoopProcessor.ABSOLUTE_HDFS_PATH_ATTRIBUTE, path2.getParent().toString());
                        processSession.getProvenanceReporter().send(putAttribute, path2.makeQualified(fileSystem.getUri(), fileSystem.getWorkingDirectory()).toString());
                        processSession.transfer(putAttribute, AbstractPutHDFS.this.getSuccessRelationship());
                        return null;
                    } catch (IOException e2) {
                        Optional findCause = AbstractPutHDFS.this.findCause(e2, GSSException.class, gSSException -> {
                            return 13 == gSSException.getMajor();
                        });
                        if (findCause.isPresent()) {
                            AbstractPutHDFS.this.getLogger().warn("An error occurred while connecting to HDFS. Rolling back session, and penalizing flow file {}", new Object[]{flowFile2.getAttribute(CoreAttributes.UUID.key()), findCause.get()});
                            processSession.rollback(true);
                            return null;
                        }
                        AbstractPutHDFS.this.getLogger().error("Failed to access HDFS due to {}", new Object[]{e2});
                        processSession.transfer(flowFile2, AbstractPutHDFS.this.getFailureRelationship());
                        return null;
                    } catch (Throwable th) {
                        if (0 != 0) {
                            try {
                                fileSystem.delete((Path) null, false);
                            } catch (Exception e3) {
                                AbstractPutHDFS.this.getLogger().error("Unable to remove temporary file {} due to {}", new Object[]{null, e3});
                            }
                        }
                        AbstractPutHDFS.this.getLogger().error("Failed to write to HDFS due to {}", new Object[]{th});
                        processSession.transfer(processSession.penalize(flowFile2), AbstractPutHDFS.this.getFailureRelationship());
                        processContext.yield();
                        return null;
                    }
                }
            });
            return;
        }
        getLogger().error("HDFS not configured properly");
        processSession.transfer(flowFile, getFailureRelationship());
        processContext.yield();
    }

    protected abstract long getBlockSize(ProcessContext processContext, ProcessSession processSession, FlowFile flowFile, Path path);

    protected abstract int getBufferSize(ProcessContext processContext, ProcessSession processSession, FlowFile flowFile);

    protected abstract short getReplication(ProcessContext processContext, ProcessSession processSession, FlowFile flowFile, Path path);

    protected abstract boolean shouldIgnoreLocality(ProcessContext processContext, ProcessSession processSession);

    protected abstract String getOwner(ProcessContext processContext, FlowFile flowFile);

    protected abstract String getGroup(ProcessContext processContext, FlowFile flowFile);

    protected abstract Relationship getSuccessRelationship();

    protected abstract Relationship getFailureRelationship();

    /* JADX INFO: Access modifiers changed from: private */
    public <T extends Throwable> Optional<T> findCause(Throwable th, Class<T> cls, Predicate<T> predicate) {
        Stream stream = Throwables.getCausalChain(th).stream();
        cls.getClass();
        Stream filter = stream.filter((v1) -> {
            return r1.isInstance(v1);
        });
        cls.getClass();
        return filter.map((v1) -> {
            return r1.cast(v1);
        }).filter(predicate).findFirst();
    }

    protected void changeOwner(ProcessContext processContext, FileSystem fileSystem, Path path, FlowFile flowFile) {
        try {
            String owner = getOwner(processContext, flowFile);
            String group = getGroup(processContext, flowFile);
            if (owner != null || group != null) {
                fileSystem.setOwner(path, owner, group);
            }
        } catch (Exception e) {
            getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{path, e});
        }
    }
}
