package org.apache.nifi.processors.websocket;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.websocket.BinaryMessageConsumer;
import org.apache.nifi.websocket.ConnectedListener;
import org.apache.nifi.websocket.TextMessageConsumer;
import org.apache.nifi.websocket.WebSocketClientService;
import org.apache.nifi.websocket.WebSocketConfigurationException;
import org.apache.nifi.websocket.WebSocketConnectedMessage;
import org.apache.nifi.websocket.WebSocketMessage;
import org.apache.nifi.websocket.WebSocketService;
import org.apache.nifi.websocket.WebSocketSessionInfo;

@TriggerSerially
/* loaded from: input_file:org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor.class */
public abstract class AbstractWebSocketGatewayProcessor extends AbstractSessionFactoryProcessor implements ConnectedListener, TextMessageConsumer, BinaryMessageConsumer {
    protected volatile ComponentLog logger;
    protected volatile ProcessSessionFactory processSessionFactory;
    protected WebSocketService webSocketService;
    protected String endpointId;
    public static final Relationship REL_CONNECTED = new Relationship.Builder().name("connected").description("The WebSocket session is established").build();
    public static final Relationship REL_MESSAGE_TEXT = new Relationship.Builder().name("text message").description("The WebSocket text message output").build();
    public static final Relationship REL_MESSAGE_BINARY = new Relationship.Builder().name("binary message").description("The WebSocket binary message output").build();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.nifi.processors.websocket.AbstractWebSocketGatewayProcessor$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$nifi$websocket$WebSocketMessage$Type = new int[WebSocketMessage.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$nifi$websocket$WebSocketMessage$Type[WebSocketMessage.Type.TEXT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$nifi$websocket$WebSocketMessage$Type[WebSocketMessage.Type.BINARY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:org/apache/nifi/processors/websocket/AbstractWebSocketGatewayProcessor$WebSocketFunction.class */
    public interface WebSocketFunction {
        void execute(WebSocketService webSocketService) throws IOException, WebSocketConfigurationException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Set<Relationship> getAbstractRelationships() {
        HashSet hashSet = new HashSet();
        hashSet.add(REL_CONNECTED);
        hashSet.add(REL_MESSAGE_TEXT);
        hashSet.add(REL_MESSAGE_BINARY);
        return hashSet;
    }

    protected void init(ProcessorInitializationContext processorInitializationContext) {
        this.logger = getLogger();
    }

    public void connected(WebSocketSessionInfo webSocketSessionInfo) {
        WebSocketConnectedMessage webSocketConnectedMessage = new WebSocketConnectedMessage(webSocketSessionInfo);
        webSocketSessionInfo.setTransitUri(getTransitUri(webSocketSessionInfo));
        enqueueMessage(webSocketConnectedMessage);
    }

    public void consume(WebSocketSessionInfo webSocketSessionInfo, String str) {
        WebSocketMessage webSocketMessage = new WebSocketMessage(webSocketSessionInfo);
        webSocketSessionInfo.setTransitUri(getTransitUri(webSocketSessionInfo));
        webSocketMessage.setPayload(str);
        enqueueMessage(webSocketMessage);
    }

    public void consume(WebSocketSessionInfo webSocketSessionInfo, byte[] bArr, int i, int i2) {
        WebSocketMessage webSocketMessage = new WebSocketMessage(webSocketSessionInfo);
        webSocketSessionInfo.setTransitUri(getTransitUri(webSocketSessionInfo));
        webSocketMessage.setPayload(bArr, i, i2);
        enqueueMessage(webSocketMessage);
    }

    public void onWebSocketServiceReady(WebSocketService webSocketService, ProcessContext processContext) throws IOException {
        if (webSocketService instanceof WebSocketClientService) {
            WebSocketClientService webSocketClientService = (WebSocketClientService) webSocketService;
            if (!processContext.hasIncomingConnection()) {
                webSocketClientService.connect(this.endpointId);
                return;
            }
            ProcessSession createSession = this.processSessionFactory.createSession();
            FlowFile flowFile = createSession.get();
            try {
                webSocketClientService.connect(this.endpointId, flowFile.getAttributes());
                createSession.remove(flowFile);
                createSession.commitAsync();
            } catch (Throwable th) {
                createSession.remove(flowFile);
                createSession.commitAsync();
                throw th;
            }
        }
    }

    protected void registerProcessorToService(ProcessContext processContext, WebSocketFunction webSocketFunction) throws IOException, WebSocketConfigurationException {
        this.webSocketService = getWebSocketService(processContext);
        this.endpointId = getEndpointId(processContext);
        this.webSocketService.registerProcessor(this.endpointId, this);
        webSocketFunction.execute(this.webSocketService);
    }

    protected abstract WebSocketService getWebSocketService(ProcessContext processContext);

    protected abstract String getEndpointId(ProcessContext processContext);

    protected boolean isProcessorRegisteredToService() {
        return (this.webSocketService == null || StringUtils.isEmpty(this.endpointId) || !this.webSocketService.isProcessorRegistered(this.endpointId, this)) ? false : true;
    }

    @OnStopped
    public void onStopped(ProcessContext processContext) {
        deregister();
    }

    private void deregister() {
        if (this.webSocketService == null) {
            return;
        }
        try {
            this.webSocketService.deregisterProcessor(this.endpointId, this);
            this.webSocketService = null;
        } catch (WebSocketConfigurationException e) {
            this.logger.warn("Failed to deregister processor {} due to: {}", new Object[]{this, e}, e);
        }
    }

    public final void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory) {
        if (this.processSessionFactory == null) {
            this.processSessionFactory = processSessionFactory;
        }
        if (!isProcessorRegisteredToService()) {
            try {
                registerProcessorToService(processContext, webSocketService -> {
                    onWebSocketServiceReady(webSocketService, processContext);
                });
            } catch (IOException | WebSocketConfigurationException e) {
                deregister();
                processContext.yield();
                throw new ProcessException("Failed to register processor to WebSocket service due to: " + e, e);
            }
        } else if (processContext.hasIncomingConnection()) {
            try {
                onWebSocketServiceReady(this.webSocketService, processContext);
            } catch (IOException e2) {
                deregister();
                processContext.yield();
                throw new ProcessException("Failed to renew session and connect to WebSocket service due to: " + e2, e2);
            }
        }
        processContext.yield();
    }

    private void enqueueMessage(WebSocketMessage webSocketMessage) {
        ProcessSession createSession = this.processSessionFactory.createSession();
        try {
            FlowFile create = createSession.create();
            HashMap hashMap = new HashMap();
            hashMap.put(WebSocketProcessorAttributes.ATTR_WS_CS_ID, this.webSocketService.getIdentifier());
            WebSocketSessionInfo sessionInfo = webSocketMessage.getSessionInfo();
            hashMap.put(WebSocketProcessorAttributes.ATTR_WS_SESSION_ID, sessionInfo.getSessionId());
            hashMap.put(WebSocketProcessorAttributes.ATTR_WS_ENDPOINT_ID, this.endpointId);
            hashMap.put(WebSocketProcessorAttributes.ATTR_WS_LOCAL_ADDRESS, sessionInfo.getLocalAddress().toString());
            hashMap.put(WebSocketProcessorAttributes.ATTR_WS_REMOTE_ADDRESS, sessionInfo.getRemoteAddress().toString());
            WebSocketMessage.Type type = webSocketMessage.getType();
            if (type != null) {
                hashMap.put(WebSocketProcessorAttributes.ATTR_WS_MESSAGE_TYPE, type.name());
            }
            FlowFile putAllAttributes = createSession.putAllAttributes(create, hashMap);
            byte[] payload = webSocketMessage.getPayload();
            if (payload != null) {
                putAllAttributes = createSession.write(putAllAttributes, outputStream -> {
                    outputStream.write(payload, webSocketMessage.getOffset(), webSocketMessage.getLength());
                });
            }
            createSession.getProvenanceReporter().receive(putAllAttributes, getTransitUri(sessionInfo));
            if (!(webSocketMessage instanceof WebSocketConnectedMessage)) {
                switch (AnonymousClass1.$SwitchMap$org$apache$nifi$websocket$WebSocketMessage$Type[((WebSocketMessage.Type) Objects.requireNonNull(type)).ordinal()]) {
                    case 1:
                        createSession.transfer(putAllAttributes, REL_MESSAGE_TEXT);
                        break;
                    case 2:
                        createSession.transfer(putAllAttributes, REL_MESSAGE_BINARY);
                        break;
                }
            } else {
                createSession.transfer(putAllAttributes, REL_CONNECTED);
            }
            createSession.commitAsync();
        } catch (Exception e) {
            this.logger.error("Unable to fully process input due to " + e, e);
            createSession.rollback();
        }
    }

    protected abstract String getTransitUri(WebSocketSessionInfo webSocketSessionInfo);
}
