package org.opencord.kafka.integrations;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.opencord.cordmcast.CordMcastStatisticsEvent;
import org.opencord.cordmcast.CordMcastStatisticsEventListener;
import org.opencord.cordmcast.CordMcastStatisticsService;
import org.opencord.kafka.EventBusService;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;

@Component(immediate = true)
/* loaded from: input_file:org/opencord/kafka/integrations/McastKafkaIntegration.class */
public class McastKafkaIntegration extends AbstractKafkaIntegration {

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected EventBusService eventBusService;

    @Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC, bind = "bindMcastStatisticsService", unbind = "unbindMcastStatisticsService")
    protected volatile CordMcastStatisticsService cordMcastStatisticsService;
    protected final AtomicReference<CordMcastStatisticsService> cordMcastStatisticsServiceRef = new AtomicReference<>();
    private final CordMcastStatisticsEventListener cordMcastStatisticsEventListener = new InternalCorcMcastStatisticsListener();
    protected static final String MCAST_OPERATIONAL_STATUS_TOPIC = "mcastOperationalStatus.events";
    private static final String TIMESTAMP = "timestamp";
    private static final String GROUP = "Group";
    private static final String SOURCE = "Source";
    private static final String VLAN = "Vlan";
    private static final String MCAST_EVENT_DATA = "McastEventData";

    /* loaded from: input_file:org/opencord/kafka/integrations/McastKafkaIntegration$InternalCorcMcastStatisticsListener.class */
    private class InternalCorcMcastStatisticsListener implements CordMcastStatisticsEventListener {
        private InternalCorcMcastStatisticsListener() {
        }

        public void event(CordMcastStatisticsEvent cordMcastStatisticsEvent) {
            McastKafkaIntegration.this.handleMcastStat(cordMcastStatisticsEvent);
        }
    }

    protected void bindMcastStatisticsService(CordMcastStatisticsService cordMcastStatisticsService) {
        bindAndAddListener(cordMcastStatisticsService, this.cordMcastStatisticsServiceRef, this.cordMcastStatisticsEventListener);
    }

    protected void unbindMcastStatisticsService(CordMcastStatisticsService cordMcastStatisticsService) {
        unbindAndRemoveListener(cordMcastStatisticsService, this.cordMcastStatisticsServiceRef, this.cordMcastStatisticsEventListener);
    }

    @Activate
    public void activate() {
        this.log.info("Started McastKafkaIntegration");
    }

    @Deactivate
    public void deactivate() {
        this.log.info("Stopped McastKafkaIntegration");
    }

    private void handleMcastStat(CordMcastStatisticsEvent cordMcastStatisticsEvent) {
        this.eventBusService.send(MCAST_OPERATIONAL_STATUS_TOPIC, serializeMcastStat(cordMcastStatisticsEvent));
        this.log.debug("CordMcastStatisticsEvent {} sent successfully", cordMcastStatisticsEvent);
    }

    private JsonNode serializeMcastStat(CordMcastStatisticsEvent cordMcastStatisticsEvent) {
        this.log.debug("Serializing AuthenticationStatisticsEvent");
        ObjectMapper objectMapper = new ObjectMapper();
        ObjectNode createObjectNode = objectMapper.createObjectNode();
        createObjectNode.put(TIMESTAMP, Instant.now().toString());
        ArrayNode putArray = createObjectNode.putArray(MCAST_EVENT_DATA);
        ((List) cordMcastStatisticsEvent.subject()).forEach(cordMcastStatistics -> {
            ObjectNode createObjectNode2 = objectMapper.createObjectNode();
            if (cordMcastStatistics.getGroupAddress() != null) {
                createObjectNode2.put(GROUP, cordMcastStatistics.getGroupAddress().toString());
            }
            if (cordMcastStatistics.getSourceAddress() != null) {
                createObjectNode2.put(SOURCE, cordMcastStatistics.getSourceAddress().toString());
            }
            createObjectNode2.put(VLAN, cordMcastStatistics.getVlanId().toShort());
            putArray.add(createObjectNode2);
        });
        return createObjectNode;
    }
}
