/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.websocket;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.websocket.BinaryMessageConsumer;
import org.apache.nifi.websocket.ConnectedListener;
import org.apache.nifi.websocket.TextMessageConsumer;
import org.apache.nifi.websocket.WebSocketClientService;
import org.apache.nifi.websocket.WebSocketConfigurationException;
import org.apache.nifi.websocket.WebSocketConnectedMessage;
import org.apache.nifi.websocket.WebSocketMessage;
import org.apache.nifi.websocket.WebSocketService;
import org.apache.nifi.websocket.WebSocketSessionInfo;

@TriggerSerially
public abstract class AbstractWebSocketGatewayProcessor
extends AbstractSessionFactoryProcessor
implements ConnectedListener,
TextMessageConsumer,
BinaryMessageConsumer {
    protected volatile ComponentLog logger;
    protected volatile ProcessSessionFactory processSessionFactory;
    protected WebSocketService webSocketService;
    protected String endpointId;
    public static final Relationship REL_CONNECTED = new Relationship.Builder().name("connected").description("The WebSocket session is established").build();
    public static final Relationship REL_MESSAGE_TEXT = new Relationship.Builder().name("text message").description("The WebSocket text message output").build();
    public static final Relationship REL_MESSAGE_BINARY = new Relationship.Builder().name("binary message").description("The WebSocket binary message output").build();

    static Set<Relationship> getAbstractRelationships() {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_CONNECTED);
        relationships.add(REL_MESSAGE_TEXT);
        relationships.add(REL_MESSAGE_BINARY);
        return relationships;
    }

    protected void init(ProcessorInitializationContext context) {
        this.logger = this.getLogger();
    }

    public void connected(WebSocketSessionInfo sessionInfo) {
        WebSocketConnectedMessage message = new WebSocketConnectedMessage(sessionInfo);
        sessionInfo.setTransitUri(this.getTransitUri(sessionInfo));
        this.enqueueMessage((WebSocketMessage)message);
    }

    public void consume(WebSocketSessionInfo sessionInfo, String messageStr) {
        WebSocketMessage message = new WebSocketMessage(sessionInfo);
        sessionInfo.setTransitUri(this.getTransitUri(sessionInfo));
        message.setPayload(messageStr);
        this.enqueueMessage(message);
    }

    public void consume(WebSocketSessionInfo sessionInfo, byte[] payload, int offset, int length) {
        WebSocketMessage message = new WebSocketMessage(sessionInfo);
        sessionInfo.setTransitUri(this.getTransitUri(sessionInfo));
        message.setPayload(payload, offset, length);
        this.enqueueMessage(message);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onWebSocketServiceReady(WebSocketService webSocketService, ProcessContext context) throws IOException {
        if (webSocketService instanceof WebSocketClientService) {
            WebSocketClientService webSocketClientService = (WebSocketClientService)webSocketService;
            if (context.hasIncomingConnection()) {
                ProcessSession session = this.processSessionFactory.createSession();
                FlowFile flowFile = session.get();
                try {
                    webSocketClientService.connect(this.endpointId, flowFile.getAttributes());
                }
                finally {
                    session.remove(flowFile);
                    session.commitAsync();
                }
            } else {
                webSocketClientService.connect(this.endpointId);
            }
        }
    }

    protected void registerProcessorToService(ProcessContext context, WebSocketFunction afterRegistration) throws IOException, WebSocketConfigurationException {
        this.webSocketService = this.getWebSocketService(context);
        this.endpointId = this.getEndpointId(context);
        this.webSocketService.registerProcessor(this.endpointId, (Processor)this);
        afterRegistration.execute(this.webSocketService);
    }

    protected abstract WebSocketService getWebSocketService(ProcessContext var1);

    protected abstract String getEndpointId(ProcessContext var1);

    protected boolean isProcessorRegisteredToService() {
        return this.webSocketService != null && !StringUtils.isEmpty((String)this.endpointId) && this.webSocketService.isProcessorRegistered(this.endpointId, (Processor)this);
    }

    @OnStopped
    public void onStopped(ProcessContext context) {
        this.deregister();
    }

    private void deregister() {
        if (this.webSocketService == null) {
            return;
        }
        try {
            this.webSocketService.deregisterProcessor(this.endpointId, (Processor)this);
            this.webSocketService = null;
        }
        catch (WebSocketConfigurationException e) {
            this.logger.warn("Failed to deregister processor {} due to: {}", new Object[]{this, e}, (Throwable)e);
        }
    }

    public final void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) {
        if (this.processSessionFactory == null) {
            this.processSessionFactory = sessionFactory;
        }
        if (!this.isProcessorRegisteredToService()) {
            this.register(context);
        } else if (this.webSocketService instanceof WebSocketClientService && context.hasIncomingConnection()) {
            this.deregister();
            this.register(context);
        }
        context.yield();
    }

    private void register(ProcessContext context) {
        try {
            this.registerProcessorToService(context, webSocketService -> this.onWebSocketServiceReady(webSocketService, context));
        }
        catch (IOException | WebSocketConfigurationException e) {
            this.deregister();
            context.yield();
            throw new ProcessException("Failed to register processor to WebSocket service due to: " + e, e);
        }
    }

    private void enqueueMessage(WebSocketMessage incomingMessage) {
        ProcessSession session = this.processSessionFactory.createSession();
        try {
            FlowFile messageFlowFile = session.create();
            HashMap<String, String> attrs = new HashMap<String, String>();
            attrs.put("websocket.controller.service.id", this.webSocketService.getIdentifier());
            WebSocketSessionInfo sessionInfo = incomingMessage.getSessionInfo();
            attrs.put("websocket.session.id", sessionInfo.getSessionId());
            attrs.put("websocket.endpoint.id", this.endpointId);
            attrs.put("websocket.local.address", sessionInfo.getLocalAddress().toString());
            attrs.put("websocket.remote.address", sessionInfo.getRemoteAddress().toString());
            WebSocketMessage.Type messageType = incomingMessage.getType();
            if (messageType != null) {
                attrs.put("websocket.message.type", messageType.name());
            }
            messageFlowFile = session.putAllAttributes(messageFlowFile, attrs);
            byte[] payload = incomingMessage.getPayload();
            if (payload != null) {
                messageFlowFile = session.write(messageFlowFile, out -> out.write(payload, incomingMessage.getOffset(), incomingMessage.getLength()));
            }
            session.getProvenanceReporter().receive(messageFlowFile, this.getTransitUri(sessionInfo));
            if (incomingMessage instanceof WebSocketConnectedMessage) {
                session.transfer(messageFlowFile, REL_CONNECTED);
            } else {
                switch (Objects.requireNonNull(messageType)) {
                    case TEXT: {
                        session.transfer(messageFlowFile, REL_MESSAGE_TEXT);
                        break;
                    }
                    case BINARY: {
                        session.transfer(messageFlowFile, REL_MESSAGE_BINARY);
                    }
                }
            }
            session.commitAsync();
        }
        catch (Exception e) {
            this.logger.error("Unable to fully process input due to " + e, (Throwable)e);
            session.rollback();
        }
    }

    protected abstract String getTransitUri(WebSocketSessionInfo var1);

    @FunctionalInterface
    public static interface WebSocketFunction {
        public void execute(WebSocketService var1) throws IOException, WebSocketConfigurationException;
    }
}

