package org.opencord.kafka.integrations;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;
import org.opencord.bng.PppoeBngControlHandler;
import org.opencord.bng.PppoeEvent;
import org.opencord.bng.PppoeEventListener;
import org.opencord.bng.PppoeEventSubject;
import org.opencord.kafka.EventBusService;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;

@Component(immediate = true)
/* loaded from: input_file:org/opencord/kafka/integrations/BngPppoeKafkaIntegration.class */
public class BngPppoeKafkaIntegration extends AbstractKafkaIntegration {

    @Reference(cardinality = ReferenceCardinality.MANDATORY)
    protected EventBusService eventBusService;

    @Reference(cardinality = ReferenceCardinality.OPTIONAL, policy = ReferencePolicy.DYNAMIC, bind = "bindPppoeBngControl", unbind = "unbindPppoeBngControl")
    protected volatile PppoeBngControlHandler ignore;
    protected static final String TOPIC_PPPOE = "bng.pppoe";
    private static final String TIMESTAMP = "timestamp";
    private static final String EVENT_TYPE = "eventType";
    private static final String OLT_DEVICE_ID = "deviceId";
    private static final String OLT_PORT_NUMBER = "portNumber";
    private static final String MAC_ADDRESS = "macAddress";
    private static final String IP_ADDRESS = "ipAddress";
    private static final String SERIAL_NUMBER = "serialNumber";
    private static final String SESSION_ID = "sessionId";
    private final AtomicReference<PppoeBngControlHandler> pppoeBngControlRef = new AtomicReference<>();
    private final PppoeEventListener pppoeEventListener = new InternalPppoeListener();

    /* loaded from: input_file:org/opencord/kafka/integrations/BngPppoeKafkaIntegration$InternalPppoeListener.class */
    private class InternalPppoeListener implements PppoeEventListener {
        private InternalPppoeListener() {
        }

        public void event(PppoeEvent pppoeEvent) {
            BngPppoeKafkaIntegration.this.eventBusService.send(BngPppoeKafkaIntegration.TOPIC_PPPOE, BngPppoeKafkaIntegration.this.serializePppoeEvent(pppoeEvent));
        }
    }

    protected void bindPppoeBngControl(PppoeBngControlHandler pppoeBngControlHandler) {
        bindAndAddListener(pppoeBngControlHandler, this.pppoeBngControlRef, this.pppoeEventListener);
    }

    protected void unbindPppoeBngControl(PppoeBngControlHandler pppoeBngControlHandler) {
        unbindAndRemoveListener(pppoeBngControlHandler, this.pppoeBngControlRef, this.pppoeEventListener);
    }

    @Activate
    public void activate() {
        this.log.info("Started PppoeKafkaIntegration");
    }

    @Deactivate
    public void deactivate() {
        unbindPppoeBngControl(this.pppoeBngControlRef.get());
        this.log.info("Stopped PppoeKafkaIntegration");
    }

    private JsonNode serializePppoeEvent(PppoeEvent pppoeEvent) {
        ObjectNode createObjectNode = new ObjectMapper().createObjectNode();
        createObjectNode.put(TIMESTAMP, Instant.now().toString());
        createObjectNode.put(EVENT_TYPE, pppoeEvent.type().toString());
        createObjectNode.put(OLT_DEVICE_ID, ((PppoeEventSubject) pppoeEvent.subject()).getOltConnectPoint().deviceId().toString());
        createObjectNode.put(OLT_PORT_NUMBER, ((PppoeEventSubject) pppoeEvent.subject()).getOltConnectPoint().port().toString());
        createObjectNode.put(MAC_ADDRESS, ((PppoeEventSubject) pppoeEvent.subject()).getMacAddress().toString());
        createObjectNode.put(IP_ADDRESS, ((PppoeEventSubject) pppoeEvent.subject()).getIpAddress().toString());
        createObjectNode.put(SERIAL_NUMBER, ((PppoeEventSubject) pppoeEvent.subject()).getOnuSerialNumber());
        createObjectNode.put(SESSION_ID, ((PppoeEventSubject) pppoeEvent.subject()).getSessionId() == 0 ? "" : String.valueOf((int) ((PppoeEventSubject) pppoeEvent.subject()).getSessionId()));
        return createObjectNode;
    }
}
