package org.fisco.bcos.sdk.eventsub;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.math.BigInteger;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.fisco.bcos.sdk.channel.Channel;
import org.fisco.bcos.sdk.channel.ResponseCallback;
import org.fisco.bcos.sdk.eventsub.filter.EventLogFilter;
import org.fisco.bcos.sdk.eventsub.filter.EventLogFilterStatus;
import org.fisco.bcos.sdk.eventsub.filter.EventLogResponse;
import org.fisco.bcos.sdk.eventsub.filter.EventPushMsgHandler;
import org.fisco.bcos.sdk.eventsub.filter.EventSubNodeRespStatus;
import org.fisco.bcos.sdk.eventsub.filter.FilterManager;
import org.fisco.bcos.sdk.eventsub.filter.ScheduleTimeConfig;
import org.fisco.bcos.sdk.model.Message;
import org.fisco.bcos.sdk.model.MsgType;
import org.fisco.bcos.sdk.model.Response;
import org.fisco.bcos.sdk.service.GroupManagerService;
import org.fisco.bcos.sdk.utils.ObjectMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/fisco/bcos/sdk/eventsub/EventSubscribeImp.class */
public class EventSubscribeImp implements EventSubscribe {
    private static final Logger logger = LoggerFactory.getLogger(EventSubscribeImp.class);
    private Channel channel;
    private GroupManagerService groupManagerService;
    private Integer groupId;
    private FilterManager filterManager;
    private EventPushMsgHandler msgHander;
    private EventResource eventResource;
    private boolean running = false;
    ScheduledThreadPoolExecutor resendSchedule = new ScheduledThreadPoolExecutor(1);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/fisco/bcos/sdk/eventsub/EventSubscribeImp$RegisterEventSubRespCallback.class */
    public class RegisterEventSubRespCallback extends ResponseCallback {
        FilterManager filterManager;
        EventLogFilter filter;
        String filterID;
        String registerID;

        public RegisterEventSubRespCallback(FilterManager filterManager, EventLogFilter eventLogFilter, String str, String str2) {
            this.filterManager = filterManager;
            this.filter = eventLogFilter;
            this.filterID = str;
            this.registerID = str2;
        }

        @Override // org.fisco.bcos.sdk.channel.ResponseCallback
        public void onResponse(Response response) {
            EventSubscribeImp.logger.info(" event filter callback response, registerID: {}, filterID: {}, seq: {}, error code: {},  content: {}", new Object[]{this.registerID, this.filterID, response.getMessageID(), response.getErrorCode(), response.getContent()});
            try {
                if (0 == response.getErrorCode().intValue()) {
                    EventLogResponse eventLogResponse = (EventLogResponse) ObjectMapperFactory.getObjectMapper().readValue(response.getContent().trim(), EventLogResponse.class);
                    if (eventLogResponse.getResult() == 0) {
                        this.filterManager.updateFilterStatus(this.filter, EventLogFilterStatus.EVENT_LOG_PUSHING, response.getCtx());
                        EventSubscribeImp.logger.info(" filter {} status changed to EVENT_LOG_PUSHING", this.filter.getFilterID());
                    } else {
                        this.filterManager.removeFilter(this.registerID);
                        this.filterManager.removeCallback(this.filterID);
                    }
                    this.filter.getCallback().onReceiveLog(eventLogResponse.getResult(), null);
                } else {
                    this.filterManager.updateFilterStatus(this.filter, EventLogFilterStatus.WAITING_REQUEST, null);
                    this.filterManager.removeCallback(this.filterID);
                }
            } catch (Exception e) {
                EventSubscribeImp.logger.error(" event filter response message exception, filterID: {}, registerID: {}, exception message: {}", new Object[]{this.filterID, this.registerID, e.getMessage()});
                this.filter.getCallback().onReceiveLog(EventSubNodeRespStatus.OTHER_ERROR.getStatus(), null);
                this.filterManager.removeFilter(this.registerID);
                this.filterManager.removeCallback(this.filterID);
            }
        }
    }

    /* loaded from: input_file:org/fisco/bcos/sdk/eventsub/EventSubscribeImp$UnRegisterEventSubRespCallback.class */
    class UnRegisterEventSubRespCallback extends ResponseCallback {
        FilterManager filterManager;
        EventLogFilter filter;

        public UnRegisterEventSubRespCallback(FilterManager filterManager, EventLogFilter eventLogFilter) {
            this.filterManager = filterManager;
            this.filter = eventLogFilter;
        }

        @Override // org.fisco.bcos.sdk.channel.ResponseCallback
        public void onResponse(Response response) {
            String registerID = this.filter.getRegisterID();
            EventSubscribeImp.logger.info(" unregister event callback response, registerID: {}, seq: {}, error code: {}, content: {}", new Object[]{registerID, response.getMessageID(), response.getErrorCode(), response.getContent()});
            try {
                if (0 == response.getErrorCode().intValue()) {
                    EventLogResponse eventLogResponse = (EventLogResponse) ObjectMapperFactory.getObjectMapper().readValue(response.getContent().trim(), EventLogResponse.class);
                    if (eventLogResponse.getResult() == 0) {
                        EventSubscribeImp.logger.info(" unregister event success");
                        this.filterManager.removeFilter(this.filter.getRegisterID());
                        this.filterManager.removeCallback(this.filter.getFilterID());
                    } else {
                        EventSubscribeImp.logger.warn(" unregister event fail");
                    }
                    this.filter.getCallback().onReceiveLog(eventLogResponse.getResult(), null);
                }
            } catch (Exception e) {
                EventSubscribeImp.logger.error(" unregister event response message exception, registerID: {}, exception message: {}", registerID, e.getMessage());
                this.filter.getCallback().onReceiveLog(EventSubNodeRespStatus.OTHER_ERROR.getStatus(), null);
            }
        }
    }

    public EventSubscribeImp(GroupManagerService groupManagerService, EventResource eventResource, Integer num) {
        this.channel = groupManagerService.getChannel();
        this.groupManagerService = groupManagerService;
        this.groupId = num;
        this.eventResource = eventResource;
        this.filterManager = eventResource.getFilterManager();
        this.msgHander = eventResource.getMsgHander();
        this.channel.addMessageHandler(MsgType.EVENT_LOG_PUSH, this.msgHander);
        this.channel.addDisconnectHandler(this.msgHander);
    }

    @Override // org.fisco.bcos.sdk.eventsub.EventSubscribe
    public EventResource getEventResource() {
        return this.eventResource;
    }

    @Override // org.fisco.bcos.sdk.eventsub.EventSubscribe
    public String subscribeEvent(EventLogParams eventLogParams, EventCallback eventCallback) {
        BigInteger latestBlockNumberByGroup = this.groupManagerService.getLatestBlockNumberByGroup(this.groupId);
        logger.info(" subscribe event at block num:" + latestBlockNumberByGroup);
        if (!eventLogParams.checkParams(latestBlockNumberByGroup)) {
            eventCallback.onReceiveLog(EventSubNodeRespStatus.INVALID_PARAMS.getStatus(), null);
            return null;
        }
        EventLogFilter eventLogFilter = new EventLogFilter();
        eventLogFilter.setRegisterID(EventSubscribe.newSeq());
        eventLogFilter.setParams(eventLogParams);
        eventLogFilter.setCallback(eventCallback);
        this.filterManager.addFilter(eventLogFilter);
        sendFilter(eventLogFilter);
        logger.info(" subscribe event, registerID: {}, filterID : {}", eventLogFilter.getRegisterID(), eventLogFilter.getFilterID());
        return eventLogFilter.getRegisterID();
    }

    @Override // org.fisco.bcos.sdk.eventsub.EventSubscribe
    public void unsubscribeEvent(String str, EventCallback eventCallback) {
        EventLogFilter filter = this.filterManager.getFilter(str);
        if (filter == null) {
            logger.info(" try to unsubscribe an nonexistent event");
            return;
        }
        filter.setCallback(eventCallback);
        this.filterManager.addCallback(filter.getFilterID(), eventCallback);
        Message message = new Message();
        message.setSeq(EventSubscribe.newSeq());
        message.setType(Short.valueOf((short) MsgType.CLIENT_UNREGISTER_EVENT_LOG.getType()));
        message.setResult(0);
        try {
            String paramJsonString = filter.getParamJsonString(String.valueOf(this.groupId), filter.getFilterID());
            logger.info(" unsubscribe event, registerID: {}, filterID : {}", filter.getRegisterID(), filter.getFilterID());
            message.setData(paramJsonString.getBytes());
        } catch (JsonProcessingException e) {
            logger.error(" unsubscribe event error: {}", e.getMessage());
        }
        EventMsg eventMsg = new EventMsg(message);
        eventMsg.setTopic("");
        eventMsg.setData(message.getData());
        this.groupManagerService.asyncSendMessageToGroup(this.groupId, eventMsg, new UnRegisterEventSubRespCallback(this.filterManager, filter));
    }

    @Override // org.fisco.bcos.sdk.eventsub.EventSubscribe
    public List<EventLogFilter> getAllSubscribedEvent() {
        return this.filterManager.getAllSubscribedEvent();
    }

    @Override // org.fisco.bcos.sdk.eventsub.EventSubscribe
    public void start() {
        if (this.running) {
            return;
        }
        this.running = true;
        this.resendSchedule.scheduleAtFixedRate(() -> {
            resendWaitingFilters();
        }, 0L, ScheduleTimeConfig.resendFrequency, TimeUnit.MILLISECONDS);
    }

    @Override // org.fisco.bcos.sdk.eventsub.EventSubscribe
    public void stop() {
        if (this.running) {
            this.running = false;
            this.resendSchedule.shutdown();
        }
    }

    private void resendWaitingFilters() {
        List<EventLogFilter> waitingReqFilters = this.filterManager.getWaitingReqFilters();
        try {
            Iterator<EventLogFilter> it = waitingReqFilters.iterator();
            while (it.hasNext()) {
                sendFilter(it.next());
            }
            logger.info("Resend waiting filters, size: {}", Integer.valueOf(waitingReqFilters.size()));
        } catch (Exception e) {
            logger.error("resendWaitingFilters exception : {}", e.getMessage());
            Iterator<EventLogFilter> it2 = waitingReqFilters.iterator();
            while (it2.hasNext()) {
                it2.next().setStatus(EventLogFilterStatus.WAITING_REQUEST);
            }
        }
    }

    private void sendFilter(EventLogFilter eventLogFilter) {
        Message message = new Message();
        message.setSeq(EventSubscribe.newSeq());
        message.setType(Short.valueOf((short) MsgType.CLIENT_REGISTER_EVENT_LOG.getType()));
        message.setResult(0);
        try {
            message.setData(eventLogFilter.getNewParamJsonString(String.valueOf(this.groupId)).getBytes());
        } catch (JsonProcessingException e) {
            logger.error("send filter error and remove bad filter, registerID: {},filterID : {}, error: {}", new Object[]{eventLogFilter.getRegisterID(), eventLogFilter.getFilterID(), e.getMessage()});
            this.filterManager.removeFilter(eventLogFilter.getRegisterID());
        }
        this.filterManager.addCallback(eventLogFilter.getFilterID(), eventLogFilter.getCallback());
        EventMsg eventMsg = new EventMsg(message);
        eventMsg.setTopic("");
        eventMsg.setData(message.getData());
        this.groupManagerService.asyncSendMessageToGroup(this.groupId, eventMsg, new RegisterEventSubRespCallback(this.filterManager, eventLogFilter, eventLogFilter.getFilterID(), eventLogFilter.getRegisterID()));
    }
}
