package org.apache.nifi.processors.beats;

import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.listen.EventBatcher;
import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processors.beats.protocol.BatchMessage;
import org.apache.nifi.processors.beats.server.BeatsMessageServerFactory;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;

@CapabilityDescription("Receive messages encoded using the Elasticsearch Beats protocol and write decoded JSON")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"beats", "logstash", "elasticsearch", "log"})
@WritesAttributes({@WritesAttribute(attribute = "beats.sender", description = "Internet Protocol address of the message sender"), @WritesAttribute(attribute = "beats.port", description = "TCP port on which the Processor received messages"), @WritesAttribute(attribute = "beats.sequencenumber", description = "The sequence number of the message included for batches containing single messages"), @WritesAttribute(attribute = "mime.type", description = "The mime.type of the content which is application/json")})
/* loaded from: input_file:org/apache/nifi/processors/beats/ListenBeats.class */
public class ListenBeats extends AbstractProcessor {
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL_CONTEXT_SERVICE").displayName("SSL Context Service").description("SSL Context Service is required to enable TLS for socket connections").required(false).identifiesControllerService(RestrictedSSLContextService.class).build();
    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder().name("Client Auth").displayName("Client Authentication").description("Client authentication policy when TLS is enabled").required(false).dependsOn(SSL_CONTEXT_SERVICE, new AllowableValue[0]).allowableValues(ClientAuth.values()).defaultValue(ClientAuth.REQUIRED.name()).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Messages received successfully will be sent out this relationship.").build();
    private static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(ListenerProperties.NETWORK_INTF_NAME, ListenerProperties.PORT, ListenerProperties.RECV_BUFFER_SIZE, ListenerProperties.MAX_MESSAGE_QUEUE_SIZE, ListenerProperties.MAX_SOCKET_BUFFER_SIZE, ListenerProperties.CHARSET, ListenerProperties.MAX_BATCH_SIZE, ListenerProperties.MESSAGE_DELIMITER, ListenerProperties.WORKER_THREADS, SSL_CONTEXT_SERVICE, CLIENT_AUTH));
    private static final Set<Relationship> RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
    protected volatile BlockingQueue<BatchMessage> events;
    protected volatile BlockingQueue<BatchMessage> errorEvents;
    protected volatile EventServer eventServer;
    protected volatile byte[] messageDemarcatorBytes;
    protected volatile EventBatcher<BatchMessage> eventBatcher;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/beats/ListenBeats$BeatsAttributes.class */
    public enum BeatsAttributes implements FlowFileAttributeKey {
        SENDER("beats.sender"),
        PORT("beats.port"),
        SEQUENCE_NUMBER("beats.sequencenumber");

        private final String key;

        BeatsAttributes(String str) {
            this.key = str;
        }

        public String key() {
            return this.key;
        }
    }

    public final Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return DESCRIPTORS;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) throws IOException {
        int intValue = processContext.getProperty(ListenerProperties.WORKER_THREADS).asInteger().intValue();
        int intValue2 = processContext.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        InetAddress interfaceAddress = NetworkUtils.getInterfaceAddress(processContext.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue());
        Charset forName = Charset.forName(processContext.getProperty(ListenerProperties.CHARSET).getValue());
        int intValue3 = processContext.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger().intValue();
        this.events = new LinkedBlockingQueue(processContext.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger().intValue());
        this.errorEvents = new LinkedBlockingQueue();
        this.messageDemarcatorBytes = getMessageDemarcator(processContext).getBytes(forName);
        BeatsMessageServerFactory beatsMessageServerFactory = new BeatsMessageServerFactory(getLogger(), interfaceAddress, intValue3, this.events);
        SSLContextService asControllerService = processContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (asControllerService != null) {
            ClientAuth valueOf = ClientAuth.valueOf(processContext.getProperty(CLIENT_AUTH).getValue());
            beatsMessageServerFactory.setSslContext(asControllerService.createContext());
            beatsMessageServerFactory.setClientAuth(valueOf);
        }
        beatsMessageServerFactory.setSocketReceiveBuffer(Integer.valueOf(intValue2));
        beatsMessageServerFactory.setWorkerThreads(intValue);
        beatsMessageServerFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
        beatsMessageServerFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
        beatsMessageServerFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
        try {
            this.eventServer = beatsMessageServerFactory.getEventServer();
        } catch (EventException e) {
            getLogger().error("Failed to bind to [{}:{}]", new Object[]{interfaceAddress, Integer.valueOf(intValue3), e});
        }
    }

    public int getListeningPort() {
        return this.eventServer.getListeningPort();
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        processEvents(processSession, getEventBatcher().getBatches(processSession, processContext.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger().intValue(), this.messageDemarcatorBytes));
    }

    @OnStopped
    public void shutdown() {
        if (this.eventServer == null) {
            getLogger().warn("Event Server not configured");
        } else {
            this.eventServer.shutdown();
        }
        this.eventBatcher = null;
    }

    private void processEvents(ProcessSession processSession, Map<String, FlowFileEventBatch<BatchMessage>> map) {
        for (Map.Entry<String, FlowFileEventBatch<BatchMessage>> entry : map.entrySet()) {
            FlowFile flowFile = entry.getValue().getFlowFile();
            List events = entry.getValue().getEvents();
            if (flowFile.getSize() == 0 || events.size() == 0) {
                processSession.remove(flowFile);
            } else {
                FlowFile putAllAttributes = processSession.putAllAttributes(flowFile, getAttributes(entry.getValue()));
                processSession.transfer(putAllAttributes, REL_SUCCESS);
                processSession.getProvenanceReporter().receive(putAllAttributes, getTransitUri(entry.getValue()));
            }
        }
    }

    private String getTransitUri(FlowFileEventBatch<BatchMessage> flowFileEventBatch) {
        return String.format("beats://%s:%d", ((BatchMessage) flowFileEventBatch.getEvents().get(0)).getSender(), Integer.valueOf(getListeningPort()));
    }

    private Map<String, String> getAttributes(FlowFileEventBatch<BatchMessage> flowFileEventBatch) {
        List events = flowFileEventBatch.getEvents();
        String sender = ((BatchMessage) events.get(0)).getSender();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(BeatsAttributes.SENDER.key(), sender);
        linkedHashMap.put(BeatsAttributes.PORT.key(), String.valueOf(getListeningPort()));
        linkedHashMap.put(CoreAttributes.MIME_TYPE.key(), "application/json");
        if (events.size() == 1) {
            linkedHashMap.put(BeatsAttributes.SEQUENCE_NUMBER.key(), String.valueOf(((BatchMessage) events.get(0)).getSequenceNumber()));
        }
        return linkedHashMap;
    }

    private String getMessageDemarcator(ProcessContext processContext) {
        return processContext.getProperty(ListenerProperties.MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
    }

    private EventBatcher<BatchMessage> getEventBatcher() {
        if (this.eventBatcher == null) {
            this.eventBatcher = new EventBatcher<BatchMessage>(getLogger(), this.events, this.errorEvents) { // from class: org.apache.nifi.processors.beats.ListenBeats.1
                /* JADX INFO: Access modifiers changed from: protected */
                public String getBatchKey(BatchMessage batchMessage) {
                    return batchMessage.getSender();
                }
            };
        }
        return this.eventBatcher;
    }
}
