package org.apache.nifi.remote;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.nifi.admin.service.AccountDisabledException;
import org.apache.nifi.admin.service.AccountNotFoundException;
import org.apache.nifi.admin.service.AccountPendingException;
import org.apache.nifi.admin.service.AdministrationException;
import org.apache.nifi.admin.service.UserService;
import org.apache.nifi.authorization.Authority;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.AbstractPort;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.BadRequestException;
import org.apache.nifi.remote.exception.NotAuthorizedException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.RequestExpiredException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.ServerProtocol;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.user.NiFiUser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/remote/StandardRootGroupPort.class */
public class StandardRootGroupPort extends AbstractPort implements RootGroupPort {
    private static final String CATEGORY = "Site to Site";
    private static final Logger logger = LoggerFactory.getLogger(StandardRootGroupPort.class);
    private final AtomicReference<Set<String>> groupAccessControl;
    private final AtomicReference<Set<String>> userAccessControl;
    private final ProcessScheduler processScheduler;
    private final boolean secure;
    private final UserService userService;
    private final BulletinRepository bulletinRepository;
    private final EventReporter eventReporter;
    private final ProcessScheduler scheduler;
    private final Set<Relationship> relationships;
    private final BlockingQueue<FlowFileRequest> requestQueue;
    private final Set<FlowFileRequest> activeRequests;
    private final Lock requestLock;
    private boolean shutdown;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/remote/StandardRootGroupPort$FlowFileRequest.class */
    public static class FlowFileRequest {
        private final Peer peer;
        private final ServerProtocol protocol;
        private final AtomicBoolean beingServiced = new AtomicBoolean(false);
        private final long creationTime = System.currentTimeMillis();
        private final BlockingQueue<ProcessingResult> queue = new ArrayBlockingQueue(1);

        public FlowFileRequest(Peer peer, ServerProtocol serverProtocol) {
            this.peer = peer;
            this.protocol = serverProtocol;
        }

        public void setServiceBegin() {
            this.beingServiced.set(true);
        }

        public boolean isBeingServiced() {
            return this.beingServiced.get();
        }

        public BlockingQueue<ProcessingResult> getResponseQueue() {
            return this.queue;
        }

        public Peer getPeer() {
            return this.peer;
        }

        public ServerProtocol getProtocol() {
            return this.protocol;
        }

        public boolean isExpired() {
            long requestExpiration = this.protocol.getRequestExpiration() * 2;
            if (requestExpiration <= 0) {
                return false;
            }
            if (requestExpiration < 500) {
                requestExpiration = 500;
            }
            return System.currentTimeMillis() > this.creationTime + requestExpiration;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/remote/StandardRootGroupPort$ProcessingResult.class */
    public static class ProcessingResult {
        private final int fileCount;
        private final Exception problem;

        public ProcessingResult(int i) {
            this.fileCount = i;
            this.problem = null;
        }

        public ProcessingResult(Exception exc) {
            this.fileCount = 0;
            this.problem = exc;
        }

        public Exception getProblem() {
            return this.problem;
        }

        public int getFileCount() {
            return this.fileCount;
        }
    }

    /* loaded from: input_file:org/apache/nifi/remote/StandardRootGroupPort$StandardPortAuthorizationResult.class */
    public static class StandardPortAuthorizationResult implements PortAuthorizationResult {
        private final boolean isAuthorized;
        private final String explanation;

        public StandardPortAuthorizationResult(boolean z, String str) {
            this.isAuthorized = z;
            this.explanation = str;
        }

        public boolean isAuthorized() {
            return this.isAuthorized;
        }

        public String getExplanation() {
            return this.explanation;
        }
    }

    public StandardRootGroupPort(String str, String str2, ProcessGroup processGroup, final TransferDirection transferDirection, ConnectableType connectableType, UserService userService, final BulletinRepository bulletinRepository, ProcessScheduler processScheduler, boolean z) {
        super(str, str2, processGroup, connectableType, processScheduler);
        this.groupAccessControl = new AtomicReference<>(new HashSet());
        this.userAccessControl = new AtomicReference<>(new HashSet());
        this.requestQueue = new ArrayBlockingQueue(1000);
        this.activeRequests = new HashSet();
        this.requestLock = new ReentrantLock();
        this.shutdown = false;
        this.processScheduler = processScheduler;
        setScheduldingPeriod("30000 nanos");
        this.userService = userService;
        this.secure = z;
        this.bulletinRepository = bulletinRepository;
        this.scheduler = processScheduler;
        setYieldPeriod("100 millis");
        this.eventReporter = new EventReporter() { // from class: org.apache.nifi.remote.StandardRootGroupPort.1
            private static final long serialVersionUID = 1;

            public void reportEvent(Severity severity, String str3, String str4) {
                bulletinRepository.addBulletin(BulletinFactory.createBulletin(StandardRootGroupPort.this.getProcessGroup().getIdentifier(), StandardRootGroupPort.this.getIdentifier(), transferDirection == TransferDirection.RECEIVE ? ComponentType.INPUT_PORT : ComponentType.OUTPUT_PORT, StandardRootGroupPort.this.getName(), str3, severity.name(), str4));
            }
        };
        this.relationships = transferDirection == TransferDirection.RECEIVE ? Collections.singleton(AbstractPort.PORT_RELATIONSHIP) : Collections.emptySet();
    }

    public Collection<Relationship> getRelationships() {
        return this.relationships;
    }

    public boolean isTriggerWhenEmpty() {
        return true;
    }

    public void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory) {
        CommunicationsSession communicationsSession;
        try {
            FlowFileRequest poll = this.requestQueue.poll(100L, TimeUnit.MILLISECONDS);
            if (poll == null) {
                return;
            }
            poll.setServiceBegin();
            this.requestLock.lock();
            try {
                if (this.shutdown && (communicationsSession = poll.getPeer().getCommunicationsSession()) != null) {
                    communicationsSession.interrupt();
                }
                this.activeRequests.add(poll);
                this.requestLock.unlock();
                ProcessSession createSession = processSessionFactory.createSession();
                try {
                    try {
                        onTrigger(processContext, createSession, poll);
                        this.requestLock.lock();
                        try {
                            this.activeRequests.remove(poll);
                            this.requestLock.unlock();
                        } finally {
                        }
                    } catch (TransmissionDisabledException e) {
                        createSession.rollback();
                        this.requestLock.lock();
                        try {
                            this.activeRequests.remove(poll);
                            this.requestLock.unlock();
                        } finally {
                            this.requestLock.unlock();
                        }
                    } catch (Exception e2) {
                        logger.error("{} Failed to process data due to {}", new Object[]{this, e2});
                        if (logger.isDebugEnabled()) {
                            logger.error("", e2);
                        }
                        createSession.rollback();
                        this.requestLock.lock();
                        try {
                            this.activeRequests.remove(poll);
                            this.requestLock.unlock();
                        } finally {
                            this.requestLock.unlock();
                        }
                    }
                } catch (Throwable th) {
                    this.requestLock.lock();
                    try {
                        this.activeRequests.remove(poll);
                        this.requestLock.unlock();
                        throw th;
                    } finally {
                    }
                }
            } finally {
                this.requestLock.unlock();
            }
        } catch (InterruptedException e3) {
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
    }

    private void onTrigger(ProcessContext processContext, ProcessSession processSession, FlowFileRequest flowFileRequest) {
        ServerProtocol protocol = flowFileRequest.getProtocol();
        BlockingQueue<ProcessingResult> responseQueue = flowFileRequest.getResponseQueue();
        if (flowFileRequest.isExpired()) {
            String format = String.format("%s Cannot service request from %s because the request has timed out", this, flowFileRequest.getPeer());
            logger.warn(format);
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, format);
            responseQueue.add(new ProcessingResult((Exception) new RequestExpiredException()));
            return;
        }
        Peer peer = flowFileRequest.getPeer();
        String userDn = peer.getCommunicationsSession().getUserDn();
        logger.debug("{} Servicing request for {} (DN={})", new Object[]{this, peer, userDn});
        PortAuthorizationResult checkUserAuthorization = checkUserAuthorization(userDn);
        if (!checkUserAuthorization.isAuthorized()) {
            String format2 = String.format("%s Cannot service request from %s (DN=%s) because peer is not authorized to communicate with this port: %s", this, flowFileRequest.getPeer(), flowFileRequest.getPeer().getCommunicationsSession().getUserDn(), checkUserAuthorization.getExplanation());
            logger.error(format2);
            this.eventReporter.reportEvent(Severity.ERROR, CATEGORY, format2);
            responseQueue.add(new ProcessingResult((Exception) new NotAuthorizedException(checkUserAuthorization.getExplanation())));
            return;
        }
        FlowFileCodec preNegotiatedCodec = protocol.getPreNegotiatedCodec();
        if (preNegotiatedCodec == null) {
            responseQueue.add(new ProcessingResult((Exception) new BadRequestException("None of the supported FlowFile Codecs supplied is compatible with this instance")));
            return;
        }
        try {
            int receiveFlowFiles = getConnectableType() == ConnectableType.INPUT_PORT ? receiveFlowFiles(processContext, processSession, preNegotiatedCodec, flowFileRequest) : transferFlowFiles(processContext, processSession, preNegotiatedCodec, flowFileRequest);
            processSession.commit();
            responseQueue.add(new ProcessingResult(receiveFlowFiles));
        } catch (IOException e) {
            processSession.rollback();
            responseQueue.add(new ProcessingResult(e));
        } catch (Exception e2) {
            processSession.rollback();
            responseQueue.add(new ProcessingResult(e2));
        }
    }

    private int transferFlowFiles(ProcessContext processContext, ProcessSession processSession, FlowFileCodec flowFileCodec, FlowFileRequest flowFileRequest) throws IOException, ProtocolException {
        return flowFileRequest.getProtocol().transferFlowFiles(flowFileRequest.getPeer(), processContext, processSession, flowFileCodec);
    }

    private int receiveFlowFiles(ProcessContext processContext, ProcessSession processSession, FlowFileCodec flowFileCodec, FlowFileRequest flowFileRequest) throws IOException, ProtocolException {
        return flowFileRequest.getProtocol().receiveFlowFiles(flowFileRequest.getPeer(), processContext, processSession, flowFileCodec);
    }

    public boolean isValid() {
        return (getConnectableType() == ConnectableType.INPUT_PORT && getConnections(Relationship.ANONYMOUS).isEmpty()) ? false : true;
    }

    public Collection<ValidationResult> getValidationErrors() {
        ArrayList arrayList = new ArrayList();
        if (!isValid()) {
            arrayList.add(new ValidationResult.Builder().explanation(String.format("Output connection for port '%s' is not defined.", getName())).subject(String.format("Port '%s'", getName())).valid(false).build());
        }
        return arrayList;
    }

    public boolean isTransmitting() {
        if (!isRunning()) {
            return false;
        }
        if (!this.requestQueue.isEmpty()) {
            return true;
        }
        this.requestLock.lock();
        try {
            return !this.activeRequests.isEmpty();
        } finally {
            this.requestLock.unlock();
        }
    }

    public void setGroupAccessControl(Set<String> set) {
        this.groupAccessControl.set(new HashSet((Collection) Objects.requireNonNull(set)));
    }

    public Set<String> getGroupAccessControl() {
        return Collections.unmodifiableSet(this.groupAccessControl.get());
    }

    public void setUserAccessControl(Set<String> set) {
        this.userAccessControl.set(new HashSet((Collection) Objects.requireNonNull(set)));
    }

    public Set<String> getUserAccessControl() {
        return Collections.unmodifiableSet(this.userAccessControl.get());
    }

    public void shutdown() {
        super.shutdown();
        this.requestLock.lock();
        try {
            this.shutdown = true;
            Iterator<FlowFileRequest> it = this.activeRequests.iterator();
            while (it.hasNext()) {
                CommunicationsSession communicationsSession = it.next().getPeer().getCommunicationsSession();
                if (communicationsSession != null) {
                    communicationsSession.interrupt();
                }
            }
        } finally {
            this.requestLock.unlock();
        }
    }

    public void onSchedulingStart() {
        super.onSchedulingStart();
        this.requestLock.lock();
        try {
            this.shutdown = false;
        } finally {
            this.requestLock.unlock();
        }
    }

    public PortAuthorizationResult checkUserAuthorization(String str) {
        if (!this.secure) {
            return new StandardPortAuthorizationResult(true, "Site-to-Site is not Secure");
        }
        if (str == null) {
            String format = String.format("%s authorization failed for user %s because the DN is unknown", this, str);
            logger.warn(format);
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, format);
            return new StandardPortAuthorizationResult(false, "User DN is not known");
        }
        try {
            NiFiUser checkAuthorization = this.userService.checkAuthorization(str);
            if (!checkAuthorization.getAuthorities().contains(Authority.ROLE_NIFI)) {
                String format2 = String.format("%s authorization failed for user %s because the user does not have Role NiFi", this, str);
                logger.warn(format2);
                this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, format2);
                return new StandardPortAuthorizationResult(false, "User does not contain required Role: NiFi");
            }
            if (this.userAccessControl.get().contains(str)) {
                return new StandardPortAuthorizationResult(true, "User is Authorized");
            }
            String userGroup = checkAuthorization.getUserGroup();
            if (userGroup == null) {
                String format3 = String.format("%s authorization failed for user %s because the user does not have a group and is not in the set of Allowed Users for this Port", this, str);
                logger.warn(format3);
                this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, format3);
                return new StandardPortAuthorizationResult(false, "User is not Authorized to communicate with " + toString());
            }
            if (this.groupAccessControl.get().contains(userGroup)) {
                return new StandardPortAuthorizationResult(true, "User is part of group '" + userGroup + "', which is Authorized to communicate with " + toString());
            }
            String format4 = String.format("%s authorization failed for user %s because the user is not in the set of Allowed Users, and the user's group is not in the set of Allowed Groups for this Port", this, str);
            logger.warn(format4);
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, format4);
            return new StandardPortAuthorizationResult(false, "User is not Authorized to communicate with " + toString());
        } catch (AccountDisabledException e) {
            String format5 = String.format("%s authorization failed for user %s because the User Status is not 'ACTIVE' but instead is 'DISABLED'", this, str);
            logger.warn(format5);
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, format5);
            return new StandardPortAuthorizationResult(false, "User Status is 'DISABLED' rather than 'ACTIVE'");
        } catch (AdministrationException e2) {
            String format6 = String.format("%s authorization failed for user %s because ", this, str, e2);
            logger.warn(format6);
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, format6);
            return new StandardPortAuthorizationResult(false, "Authorization failed because " + e2);
        } catch (AccountPendingException e3) {
            String format7 = String.format("%s authorization failed for user %s because the User Status is not 'ACTIVE' but instead is 'PENDING'", this, str);
            logger.warn(format7);
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, format7);
            return new StandardPortAuthorizationResult(false, "User Status is 'PENDING' rather than 'ACTIVE'");
        } catch (AccountNotFoundException e4) {
            String format8 = String.format("%s authorization failed for user %s because the DN is unknown", this, str);
            logger.warn(format8);
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, format8);
            return new StandardPortAuthorizationResult(false, "User DN is not known");
        } catch (Exception e5) {
            String format9 = String.format("%s authorization failed for user %s because ", this, str, e5);
            logger.warn(format9);
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, format9);
            return new StandardPortAuthorizationResult(false, "Authorization failed because " + e5);
        }
    }

    public int receiveFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, String> map) throws NotAuthorizedException, BadRequestException, RequestExpiredException {
        if (getConnectableType() != ConnectableType.INPUT_PORT) {
            throw new IllegalStateException("Cannot receive FlowFiles because this port is not an Input Port");
        }
        if (!isRunning()) {
            throw new IllegalStateException("Port not running");
        }
        try {
            try {
                FlowFileRequest flowFileRequest = new FlowFileRequest(peer, serverProtocol);
                if (!this.requestQueue.offer(flowFileRequest)) {
                    throw new RequestExpiredException();
                }
                this.scheduler.registerEvent(this);
                while (!flowFileRequest.isBeingServiced()) {
                    if (flowFileRequest.isExpired()) {
                        throw new SocketTimeoutException("Read timed out");
                    }
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                    }
                }
                ProcessingResult take = flowFileRequest.getResponseQueue().take();
                Exception problem = take.getProblem();
                if (problem == null) {
                    return take.getFileCount();
                }
                throw problem;
            } catch (NotAuthorizedException | BadRequestException | RequestExpiredException e2) {
                throw e2;
            }
        } catch (Exception e3) {
            throw new ProcessException(e3);
        } catch (ProtocolException e4) {
            throw new BadRequestException(e4);
        }
    }

    public int transferFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, String> map) throws NotAuthorizedException, BadRequestException, RequestExpiredException {
        if (getConnectableType() != ConnectableType.OUTPUT_PORT) {
            throw new IllegalStateException("Cannot send FlowFiles because this port is not an Output Port");
        }
        if (!isRunning()) {
            throw new IllegalStateException("Port not running");
        }
        try {
            try {
                FlowFileRequest flowFileRequest = new FlowFileRequest(peer, serverProtocol);
                if (!this.requestQueue.offer(flowFileRequest)) {
                    throw new RequestExpiredException();
                }
                this.scheduler.registerEvent(this);
                while (!flowFileRequest.isBeingServiced()) {
                    if (flowFileRequest.isExpired()) {
                        throw new SocketTimeoutException("Read timed out");
                    }
                    try {
                        Thread.sleep(100L);
                    } catch (InterruptedException e) {
                    }
                }
                ProcessingResult take = flowFileRequest.getResponseQueue().take();
                Exception problem = take.getProblem();
                if (problem == null) {
                    return take.getFileCount();
                }
                throw problem;
            } catch (NotAuthorizedException | BadRequestException | RequestExpiredException e2) {
                throw e2;
            }
        } catch (Exception e3) {
            throw new ProcessException(e3);
        } catch (ProtocolException e4) {
            throw new BadRequestException(e4);
        }
    }

    public SchedulingStrategy getSchedulingStrategy() {
        return SchedulingStrategy.TIMER_DRIVEN;
    }

    public boolean isSideEffectFree() {
        return false;
    }
}
