package org.opencord.kafka.integrations;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.tuple.Pair;
import org.onosproject.net.behaviour.BngProgrammable;
import org.onosproject.net.pi.runtime.PiCounterCellData;
import org.opencord.bng.BngAttachment;
import org.opencord.bng.BngStatsEvent;
import org.opencord.bng.BngStatsEventListener;
import org.opencord.bng.BngStatsEventSubject;
import org.opencord.bng.BngStatsService;
import org.opencord.kafka.EventBusService;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;

@Component(immediate = true)
/* loaded from: input_file:org/opencord/kafka/integrations/BngStatsKafkaIntegration.class */
public class BngStatsKafkaIntegration extends AbstractKafkaIntegration {

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected EventBusService eventBusService;

    @Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC, bind = "bindBngStatsService", unbind = "unbindBngStatsService")
    protected volatile BngStatsService ignore;
    private static final String TOPIC_STATS = "bng.stats";
    private static final String SUBSCRIBER_S_TAG = "sTag";
    private static final String SUBSCRIBER_C_TAG = "cTag";
    private static final String CONTROL_PACKETS = "controlPackets";
    private static final String TIMESTAMP = "timestamp";
    private static final String ATTACHMENT_TYPE = "attachmentType";
    private static final String DEVICE_ID = "deviceId";
    private static final String PORT_NUMBER = "portNumber";
    private static final String MAC_ADDRESS = "macAddress";
    private static final String IP_ADDRESS = "ipAddress";
    private static final String ONU_SERIAL_NUMBER = "onuSerialNumber";
    private static final String PPPOE_SESSION_ID = "pppoeSessionId";
    private static final String UP_RX_BYTES = "upRxBytes";
    private static final String UP_RX_PACKETS = "upRxPackets";
    private static final String UP_TX_BYTES = "upTxBytes";
    private static final String UP_TX_PACKETS = "upTxPackets";
    private static final String UP_DROP_BYTES = "upDropBytes";
    private static final String UP_DROP_PACKETS = "upDropPackets";
    private static final String DOWN_RX_BYTES = "downRxBytes";
    private static final String DOWN_RX_PACKETS = "downRxPackets";
    private static final String DOWN_TX_BYTES = "downTxBytes";
    private static final String DOWN_TX_PACKETS = "downTxPackets";
    private static final String DOWN_DROP_BYTES = "downDropBytes";
    private static final String DOWN_DROP_PACKETS = "downDropPackets";
    private static final ImmutableMap<BngProgrammable.BngCounterType, Pair<String, String>> MAP_COUNTERS = ImmutableMap.builder().put(BngProgrammable.BngCounterType.UPSTREAM_RX, Pair.of(UP_RX_BYTES, UP_RX_PACKETS)).put(BngProgrammable.BngCounterType.UPSTREAM_TX, Pair.of(UP_TX_BYTES, UP_TX_PACKETS)).put(BngProgrammable.BngCounterType.UPSTREAM_DROPPED, Pair.of(UP_DROP_BYTES, UP_DROP_PACKETS)).put(BngProgrammable.BngCounterType.DOWNSTREAM_RX, Pair.of(DOWN_RX_BYTES, DOWN_RX_PACKETS)).put(BngProgrammable.BngCounterType.DOWNSTREAM_TX, Pair.of(DOWN_TX_BYTES, DOWN_TX_PACKETS)).put(BngProgrammable.BngCounterType.DOWNSTREAM_DROPPED, Pair.of(DOWN_DROP_BYTES, DOWN_DROP_PACKETS)).build();
    private final AtomicReference<BngStatsService> bngStatsServiceRef = new AtomicReference<>();
    private final BngStatsEventListener statsListener = new InternalStatsListener();

    /* loaded from: input_file:org/opencord/kafka/integrations/BngStatsKafkaIntegration$InternalStatsListener.class */
    private class InternalStatsListener implements BngStatsEventListener {
        private InternalStatsListener() {
        }

        public void event(BngStatsEvent bngStatsEvent) {
            BngStatsKafkaIntegration.this.eventBusService.send(BngStatsKafkaIntegration.TOPIC_STATS, BngStatsKafkaIntegration.this.serializeBngStatsEvent((BngStatsEventSubject) bngStatsEvent.subject()));
        }
    }

    protected void bindBngStatsService(BngStatsService bngStatsService) {
        bindAndAddListener(bngStatsService, this.bngStatsServiceRef, this.statsListener);
    }

    protected void unbindBngStatsService(BngStatsService bngStatsService) {
        unbindAndRemoveListener(bngStatsService, this.bngStatsServiceRef, this.statsListener);
    }

    @Activate
    public void activate() {
        this.log.info("Started BngKafkaIntegration");
    }

    @Deactivate
    public void deactivate() {
        unbindBngStatsService(this.bngStatsServiceRef.get());
        this.log.info("Stopped BngKafkaIntegration");
    }

    private JsonNode serializeBngStatsEvent(BngStatsEventSubject bngStatsEventSubject) {
        ObjectNode createObjectNode = new ObjectMapper().createObjectNode();
        Map<BngProgrammable.BngCounterType, PiCounterCellData> attachmentStats = bngStatsEventSubject.getAttachmentStats();
        BngAttachment bngAttachment = bngStatsEventSubject.getBngAttachment();
        createObjectNode.put(MAC_ADDRESS, bngAttachment.macAddress().toString());
        createObjectNode.put(IP_ADDRESS, bngAttachment.ipAddress().toString());
        createObjectNode.put(PPPOE_SESSION_ID, bngAttachment.pppoeSessionId());
        createObjectNode.put(SUBSCRIBER_S_TAG, bngAttachment.sTag().toShort());
        createObjectNode.put(SUBSCRIBER_C_TAG, bngAttachment.cTag().toShort());
        createObjectNode.put(ONU_SERIAL_NUMBER, bngAttachment.onuSerial());
        createObjectNode.put(ATTACHMENT_TYPE, bngAttachment.type().toString());
        createObjectNode.put(DEVICE_ID, bngAttachment.oltConnectPoint().deviceId().toString());
        createObjectNode.put(PORT_NUMBER, bngAttachment.oltConnectPoint().port().toString());
        ObjectNode createNodesStats = createNodesStats(attachmentStats, createObjectNode);
        createNodesStats.put(CONTROL_PACKETS, attachmentStats.get(BngProgrammable.BngCounterType.CONTROL_PLANE).packets());
        createNodesStats.put(TIMESTAMP, Instant.now().toString());
        return createNodesStats;
    }

    private ObjectNode createNodesStats(Map<BngProgrammable.BngCounterType, PiCounterCellData> map, ObjectNode objectNode) {
        MAP_COUNTERS.forEach((bngCounterType, pair) -> {
            if (map.containsKey(bngCounterType)) {
                objectNode.put((String) pair.getLeft(), ((PiCounterCellData) map.get(bngCounterType)).bytes());
                objectNode.put((String) pair.getRight(), ((PiCounterCellData) map.get(bngCounterType)).packets());
            }
        });
        return objectNode;
    }
}
