package org.opencord.kafka.integrations;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.device.DeviceService;
import org.opencord.aaa.AaaStatistics;
import org.opencord.aaa.AuthenticationEvent;
import org.opencord.aaa.AuthenticationEventListener;
import org.opencord.aaa.AuthenticationService;
import org.opencord.aaa.AuthenticationStatisticsEvent;
import org.opencord.aaa.AuthenticationStatisticsEventListener;
import org.opencord.aaa.AuthenticationStatisticsService;
import org.opencord.kafka.EventBusService;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;

@Component(immediate = true)
/* loaded from: input_file:org/opencord/kafka/integrations/AaaKafkaIntegration.class */
public class AaaKafkaIntegration extends AbstractKafkaIntegration {

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected EventBusService eventBusService;

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected DeviceService deviceService;

    @Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC, bind = "bindAuthenticationService", unbind = "unbindAuthenticationService")
    protected volatile AuthenticationService ignore;

    @Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC, bind = "bindAuthenticationStatService", unbind = "unbindAuthenticationStatService")
    protected volatile AuthenticationStatisticsService ignore2;
    private static final String TOPIC = "authentication.events";
    private static final String AUTHENTICATION_STATISTICS_TOPIC = "onos.aaa.stats.kpis";
    private static final String TIMESTAMP = "timestamp";
    private static final String DEVICE_ID = "deviceId";
    private static final String PORT_NUMBER = "portNumber";
    private static final String SERIAL_NUMBER = "serialNumber";
    private static final String AUTHENTICATION_STATE = "authenticationState";
    private static final String ACCEPT_RESPONSES_RX = "acceptResponsesRx";
    private static final String REJECT_RESPONSES_RX = "rejectResponsesRx";
    private static final String CHALLENGE_RESPONSES_RX = "challengeResponsesRx";
    private static final String ACCESS_REQUESTS_TX = "accessRequestsTx";
    private static final String INVALID_VALIDATORS_RX = "invalidValidatorsRx";
    private static final String UNKNOWN_TYPE_RX = "unknownTypeRx";
    private static final String PENDING_REQUESTS = "pendingRequests";
    private static final String DROPPED_RESPONSES_RX = "droppedResponsesRx";
    private static final String MALFORMED_RESPONSES_RX = "malformedResponsesRx";
    private static final String UNKNOWN_SERVER_RX = "unknownServerRx";
    private static final String REQUEST_RTT_MILLIS = "requestRttMillis";
    private static final String REQUEST_RE_TX = "requestReTx";
    private final AtomicReference<AuthenticationService> authServiceRef = new AtomicReference<>();
    private final AtomicReference<AuthenticationStatisticsService> authStatServiceRef = new AtomicReference<>();
    private final AuthenticationEventListener listener = new InternalAuthenticationListener();
    private final AuthenticationStatisticsEventListener authenticationStatisticsEventListener = new InternalAuthenticationStatisticsListner();

    /* loaded from: input_file:org/opencord/kafka/integrations/AaaKafkaIntegration$InternalAuthenticationListener.class */
    private class InternalAuthenticationListener implements AuthenticationEventListener {
        private InternalAuthenticationListener() {
        }

        public void event(AuthenticationEvent authenticationEvent) {
            AaaKafkaIntegration.this.handle(authenticationEvent);
        }
    }

    /* loaded from: input_file:org/opencord/kafka/integrations/AaaKafkaIntegration$InternalAuthenticationStatisticsListner.class */
    private class InternalAuthenticationStatisticsListner implements AuthenticationStatisticsEventListener {
        private InternalAuthenticationStatisticsListner() {
        }

        public void event(AuthenticationStatisticsEvent authenticationStatisticsEvent) {
            AaaKafkaIntegration.this.handleStat(authenticationStatisticsEvent);
        }
    }

    protected void bindAuthenticationService(AuthenticationService authenticationService) {
        bindAndAddListener(authenticationService, this.authServiceRef, this.listener);
    }

    protected void unbindAuthenticationService(AuthenticationService authenticationService) {
        unbindAndRemoveListener(authenticationService, this.authServiceRef, this.listener);
    }

    protected void bindAuthenticationStatService(AuthenticationStatisticsService authenticationStatisticsService) {
        bindAndAddListener(authenticationStatisticsService, this.authStatServiceRef, this.authenticationStatisticsEventListener);
    }

    protected void unbindAuthenticationStatService(AuthenticationStatisticsService authenticationStatisticsService) {
        unbindAndRemoveListener(authenticationStatisticsService, this.authStatServiceRef, this.authenticationStatisticsEventListener);
    }

    @Activate
    public void activate() {
        this.log.info("Started AaaKafkaIntegration");
    }

    @Deactivate
    public void deactivate() {
        unbindAuthenticationService(this.authServiceRef.get());
        unbindAuthenticationStatService(this.authStatServiceRef.get());
        this.log.info("Stopped AaaKafkaIntegration");
    }

    private void handle(AuthenticationEvent authenticationEvent) {
        this.eventBusService.send(TOPIC, serialize(authenticationEvent));
    }

    private void handleStat(AuthenticationStatisticsEvent authenticationStatisticsEvent) {
        this.eventBusService.send(AUTHENTICATION_STATISTICS_TOPIC, serializeStat(authenticationStatisticsEvent));
        this.log.debug("AuthenticationStatisticsEvent sent successfully");
    }

    private JsonNode serialize(AuthenticationEvent authenticationEvent) {
        String value = this.deviceService.getPort((ConnectPoint) authenticationEvent.subject()).annotations().value("portName");
        ObjectNode createObjectNode = new ObjectMapper().createObjectNode();
        createObjectNode.put(TIMESTAMP, Instant.now().toString());
        createObjectNode.put(DEVICE_ID, ((ConnectPoint) authenticationEvent.subject()).deviceId().toString());
        createObjectNode.put(PORT_NUMBER, ((ConnectPoint) authenticationEvent.subject()).port().toString());
        createObjectNode.put(SERIAL_NUMBER, value);
        createObjectNode.put(AUTHENTICATION_STATE, authenticationEvent.type().toString());
        return createObjectNode;
    }

    private JsonNode serializeStat(AuthenticationStatisticsEvent authenticationStatisticsEvent) {
        this.log.debug("Serializing AuthenticationStatisticsEvent");
        ObjectNode createObjectNode = new ObjectMapper().createObjectNode();
        createObjectNode.put(TIMESTAMP, Instant.now().toString());
        createObjectNode.put(ACCEPT_RESPONSES_RX, ((AaaStatistics) authenticationStatisticsEvent.subject()).getAcceptResponsesRx());
        createObjectNode.put(REJECT_RESPONSES_RX, ((AaaStatistics) authenticationStatisticsEvent.subject()).getRejectResponsesRx());
        createObjectNode.put(CHALLENGE_RESPONSES_RX, ((AaaStatistics) authenticationStatisticsEvent.subject()).getChallengeResponsesRx());
        createObjectNode.put(ACCESS_REQUESTS_TX, ((AaaStatistics) authenticationStatisticsEvent.subject()).getAccessRequestsTx());
        createObjectNode.put(INVALID_VALIDATORS_RX, ((AaaStatistics) authenticationStatisticsEvent.subject()).getInvalidValidatorsRx());
        createObjectNode.put(UNKNOWN_TYPE_RX, ((AaaStatistics) authenticationStatisticsEvent.subject()).getUnknownTypeRx());
        createObjectNode.put(PENDING_REQUESTS, ((AaaStatistics) authenticationStatisticsEvent.subject()).getPendingRequests());
        createObjectNode.put(DROPPED_RESPONSES_RX, ((AaaStatistics) authenticationStatisticsEvent.subject()).getDroppedResponsesRx());
        createObjectNode.put(MALFORMED_RESPONSES_RX, ((AaaStatistics) authenticationStatisticsEvent.subject()).getMalformedResponsesRx());
        createObjectNode.put(UNKNOWN_SERVER_RX, ((AaaStatistics) authenticationStatisticsEvent.subject()).getUnknownServerRx());
        createObjectNode.put(REQUEST_RTT_MILLIS, ((AaaStatistics) authenticationStatisticsEvent.subject()).getRequestRttMilis());
        createObjectNode.put(REQUEST_RE_TX, ((AaaStatistics) authenticationStatisticsEvent.subject()).getRequestReTx());
        return createObjectNode;
    }
}
