/*
 * Decompiled with CFR 0.152.
 */
package com.networknt.saga.orchestration;

import com.networknt.eventuate.common.impl.JSonMapper;
import com.networknt.eventuate.jdbc.IdGenerator;
import com.networknt.saga.common.LockTarget;
import com.networknt.saga.common.SagaReplyHeaders;
import com.networknt.saga.common.SagaUnlockCommand;
import com.networknt.saga.orchestration.AggregateInstanceSubscriptionsDAO;
import com.networknt.saga.orchestration.DestinationAndResource;
import com.networknt.saga.orchestration.EnlistedAggregate;
import com.networknt.saga.orchestration.EnlistedAggregatesDao;
import com.networknt.saga.orchestration.EventClassAndAggregateId;
import com.networknt.saga.orchestration.EventToPublish;
import com.networknt.saga.orchestration.ReplyClassAndHandler;
import com.networknt.saga.orchestration.Saga;
import com.networknt.saga.orchestration.SagaActions;
import com.networknt.saga.orchestration.SagaDataSerde;
import com.networknt.saga.orchestration.SagaDefinition;
import com.networknt.saga.orchestration.SagaEventHandler;
import com.networknt.saga.orchestration.SagaInstance;
import com.networknt.saga.orchestration.SagaInstanceData;
import com.networknt.saga.orchestration.SagaInstanceRepository;
import com.networknt.saga.orchestration.SagaManager;
import com.networknt.saga.orchestration.SagaTypeAndId;
import com.networknt.saga.participant.SagaLockManager;
import com.networknt.service.SingletonServiceFactory;
import com.networknt.tram.command.common.ChannelMapping;
import com.networknt.tram.command.common.Command;
import com.networknt.tram.command.common.CommandMessageHeaders;
import com.networknt.tram.command.consumer.CommandWithDestination;
import com.networknt.tram.command.producer.CommandProducer;
import com.networknt.tram.event.common.DomainEvent;
import com.networknt.tram.event.publisher.DomainEventPublisher;
import com.networknt.tram.event.subscriber.DomainEventEnvelope;
import com.networknt.tram.event.subscriber.DomainEventEnvelopeImpl;
import com.networknt.tram.message.common.Message;
import com.networknt.tram.message.consumer.MessageConsumer;
import com.networknt.tram.message.producer.MessageProducer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SagaManagerImpl<Data>
implements SagaManager<Data> {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    public static final String DEFAULT_STATE_NAME = "{\"currentlyExecuting\":-1,\"compensating\":false,\"endState\":false}";
    private SagaInstanceRepository sagaInstanceRepository = (SagaInstanceRepository)SingletonServiceFactory.getBean(SagaInstanceRepository.class);
    private MessageConsumer messageConsumer = (MessageConsumer)SingletonServiceFactory.getBean(MessageConsumer.class);
    private IdGenerator idGenerator = (IdGenerator)SingletonServiceFactory.getBean(IdGenerator.class);
    private AggregateInstanceSubscriptionsDAO aggregateInstanceSubscriptionsDAO = (AggregateInstanceSubscriptionsDAO)SingletonServiceFactory.getBean(AggregateInstanceSubscriptionsDAO.class);
    private EnlistedAggregatesDao enlistedAggregatesDao = (EnlistedAggregatesDao)SingletonServiceFactory.getBean(EnlistedAggregatesDao.class);
    private ChannelMapping channelMapping = (ChannelMapping)SingletonServiceFactory.getBean(ChannelMapping.class);
    private MessageProducer messageProducer = (MessageProducer)SingletonServiceFactory.getBean(MessageProducer.class);
    private CommandProducer commandProducer = (CommandProducer)SingletonServiceFactory.getBean(CommandProducer.class);
    private SagaLockManager sagaLockManager = (SagaLockManager)SingletonServiceFactory.getBean(SagaLockManager.class);
    private DomainEventPublisher domainEventPublisher = (DomainEventPublisher)SingletonServiceFactory.getBean(DomainEventPublisher.class);
    private Saga<Data> saga;

    public SagaManagerImpl(Saga<Data> saga) {
        this.saga = saga;
        this.messageConsumer.subscribe(saga.getClass().getName() + "-consumer", Collections.singleton(this.channelMapping.transform(this.makeSagaReplyChannel())), this::handleMessage);
    }

    @Override
    public SagaInstance create(Data sagaData) {
        return this.create(sagaData, Optional.empty());
    }

    @Override
    public SagaInstance create(Data data, Class targetClass, Object targetId) {
        return this.create(data, Optional.of(new LockTarget(targetClass, targetId).getTarget()));
    }

    @Override
    public SagaInstance create(Data sagaData, Optional<String> resource) {
        SagaInstance sagaInstance = new SagaInstance(this.getSagaType(), null, DEFAULT_STATE_NAME, null, SagaDataSerde.serializeSagaData(sagaData), new HashSet<DestinationAndResource>());
        this.sagaInstanceRepository.save(sagaInstance);
        String sagaId = sagaInstance.getId();
        if (resource.isPresent() && !this.sagaLockManager.claimLock(this.getSagaType(), sagaId, resource.get())) {
            throw new IllegalArgumentException("Cannot claim lock for resource");
        }
        SagaActions actions = (SagaActions)this.getStateDefinition().getStartingHandler().get().apply(sagaData);
        sagaInstance.setStateName(actions.getUpdatedState().get());
        this.sagaInstanceRepository.update(sagaInstance);
        List<CommandWithDestination> commands = actions.getCommands();
        sagaData = actions.getUpdatedSagaData().orElse(sagaData);
        sagaInstance.setLastRequestId(this.sendCommands(sagaId, commands));
        sagaInstance.setSerializedSagaData(SagaDataSerde.serializeSagaData(sagaData));
        this.publishEvents(sagaId, actions.getEventsToPublish(), actions.getUpdatedState());
        Optional<String> possibleNewState = actions.getUpdatedState();
        this.maybeUpdateState(sagaInstance, possibleNewState);
        this.maybePerformEndStateActions(sagaId, sagaInstance, possibleNewState);
        this.sagaInstanceRepository.update(sagaInstance);
        this.updateEnlistedAggregates(sagaId, actions.getEnlistedAggregates());
        this.updateEventInstanceSubscriptions(sagaData, sagaId, sagaInstance.getStateName());
        return sagaInstance;
    }

    private void publishEvents(String sagaId, Set<EventToPublish> eventsToPublish, Optional<String> updatedState) {
        Set elas = Collections.emptySet();
        boolean isEndState = updatedState.filter(s -> this.getStateDefinition().isEndState((String)s)).isPresent();
        for (EventToPublish event : eventsToPublish) {
            HashMap<String, String> headers = new HashMap<String, String>();
            if (isEndState) {
                Set<String> sagaIds = this.enlistedAggregatesDao.findSagas(event.getAggregateType(), event.getAggregateId());
                sagaIds.remove(sagaId);
                headers.put("participating-saga-ids", sagaIds.stream().collect(Collectors.joining(",")));
            }
            this.domainEventPublisher.publish(event.getAggregateType().getName(), (Object)event.getAggregateId(), headers, event.getDomainEvents());
        }
    }

    private void performEndStateActions(String sagaId, SagaInstance sagaInstance) {
        for (DestinationAndResource dr : sagaInstance.getDestinationsAndResources()) {
            HashMap<String, String> headers = new HashMap<String, String>();
            headers.put("command_saga_id", sagaId);
            headers.put("command_saga_type", this.getSagaType());
            this.commandProducer.send(dr.getDestination(), dr.getResource(), (Command)new SagaUnlockCommand(), this.makeSagaReplyChannel(), headers);
        }
    }

    private SagaDefinition<Data> getStateDefinition() {
        SagaDefinition<Data> sm = this.saga.getSagaDefinition();
        Objects.requireNonNull(sm);
        return sm;
    }

    private String getSagaType() {
        return this.saga.getSagaType();
    }

    public void subscribeToReplyChannel() {
        this.messageConsumer.subscribe(this.saga.getClass().getName() + "-consumer", Collections.singleton(this.channelMapping.transform(this.makeSagaReplyChannel())), this::handleMessage);
    }

    private String makeSagaReplyChannel() {
        return this.getSagaType() + "-reply";
    }

    private void updateEventInstanceSubscriptions(Data sagaData, String sagaId, String stateName) {
        List<EventClassAndAggregateId> instanceEvents = this.getStateDefinition().findEventHandlers(this.saga, stateName, sagaData);
        this.aggregateInstanceSubscriptionsDAO.update(this.getSagaType(), sagaId, instanceEvents);
    }

    private String sendCommands(String sagaId, List<CommandWithDestination> commands) {
        String lastRequestId = null;
        for (CommandWithDestination command : commands) {
            lastRequestId = this.idGenerator.genId().asString();
            HashMap<String, String> headers = new HashMap<String, String>();
            headers.put("command_saga_type", this.getSagaType());
            headers.put("command_saga_id", sagaId);
            headers.put("command_saga_request_id", lastRequestId);
            this.commandProducer.send(command.getDestinationChannel(), command.getResource(), command.getCommand(), this.makeSagaReplyChannel(), headers);
        }
        return lastRequestId;
    }

    public void handleMessage(Message message) {
        this.logger.debug("handle message invoked {}", (Object)message);
        if (message.hasHeader(SagaReplyHeaders.REPLY_SAGA_ID)) {
            this.handleReply(message);
        } else if (message.hasHeader("event-type")) {
            String aggregateType = message.getRequiredHeader("event-aggregate-type");
            String aggregateId = message.getRequiredHeader("PARTITION_ID");
            String eventType = message.getRequiredHeader("event-type");
            for (SagaTypeAndId sagaTypeAndId : this.aggregateInstanceSubscriptionsDAO.findSagas(aggregateType, aggregateId, eventType)) {
                this.handleAggregateInstanceEvent(sagaTypeAndId.getSagaType(), sagaTypeAndId.getSagaId(), message, aggregateType, aggregateId, eventType);
            }
        } else {
            this.logger.warn("Handle message doesn't know what to do with: {} ", (Object)message);
        }
    }

    private void handleAggregateInstanceEvent(String sagaType, String sagaId, Message message, String aggregateType, String aggregateId, String eventType) {
        System.out.println("Got handleAggregateInstanceEvent: " + message + ", type=" + sagaType + ", instance=" + sagaId);
        SagaInstanceData sagaInstanceAndData = this.sagaInstanceRepository.findWithData(sagaType, sagaId);
        SagaInstance sagaInstance = sagaInstanceAndData.getSagaInstance();
        Object sagaData = sagaInstanceAndData.getSagaData();
        String currentState = sagaInstance.getStateName();
        this.logger.info("Current state={}", (Object)currentState);
        Optional<SagaEventHandler<Data>> eventHandler = this.getStateDefinition().findEventHandler(this.saga, currentState, sagaData, aggregateType, Long.parseLong(aggregateId), eventType);
        if (!eventHandler.isPresent()) {
            this.logger.error("No event handler for: {}", (Object)message);
            return;
        }
        this.logger.info("Invoking event handler for {}", (Object)message);
        SagaActions<Data> actions = eventHandler.get().getAction().apply(sagaData, (DomainEventEnvelope<DomainEvent>)new DomainEventEnvelopeImpl(null, null, null, null, null));
        sagaInstance.setSerializedSagaData(SagaDataSerde.serializeSagaData(sagaData));
        this.sagaInstanceRepository.update(sagaInstance);
    }

    private void handleReply(Message message) {
        if (!this.isReplyForThisSagaType(message).booleanValue()) {
            return;
        }
        this.logger.debug("Handle reply: {}", (Object)message);
        String sagaId = message.getRequiredHeader(SagaReplyHeaders.REPLY_SAGA_ID);
        String sagaType = message.getRequiredHeader(SagaReplyHeaders.REPLY_SAGA_TYPE);
        String requestId = message.getRequiredHeader(SagaReplyHeaders.REPLY_SAGA_REQUEST_ID);
        String messageId = message.getId();
        if (this.isDuplicateReply(messageId, sagaType, sagaId)) {
            return;
        }
        String messageType = message.getRequiredHeader("reply_type");
        String messageJson = message.getPayload();
        SagaInstanceData sagaInstanceAndData = this.sagaInstanceRepository.findWithData(sagaType, sagaId);
        SagaInstance sagaInstance = sagaInstanceAndData.getSagaInstance();
        Object sagaData = sagaInstanceAndData.getSagaData();
        message.getHeader("saga-locked-target").ifPresent(lockedTarget -> {
            String destination = message.getRequiredHeader(CommandMessageHeaders.inReply((String)"command__destination"));
            sagaInstance.addDestinationsAndResources(Collections.singleton(new DestinationAndResource(destination, (String)lockedTarget)));
        });
        String currentState = sagaInstance.getStateName();
        this.logger.info("Current state={}", (Object)currentState);
        if (!this.getStateDefinition().isEndState(currentState)) {
            Optional<ReplyClassAndHandler> replyHandler = this.getStateDefinition().findReplyHandler(this.saga, sagaInstance, currentState, sagaData, requestId, message);
            if (!replyHandler.isPresent()) {
                this.logger.error("No handler for {}", (Object)message);
                return;
            }
            ReplyClassAndHandler m = replyHandler.get();
            Object param = JSonMapper.fromJson((String)messageJson, m.getReplyClass());
            SagaActions actions = m.getReplyHandler().apply(sagaData, param);
            List<CommandWithDestination> commands = actions.getCommands();
            sagaData = actions.getUpdatedSagaData().orElse(sagaData);
            this.logger.info("Handled reply. Sending commands {}", commands);
            this.publishEvents(sagaId, actions.getEventsToPublish(), actions.getUpdatedState());
            Optional<String> possibleNewState = actions.getUpdatedState();
            this.maybeUpdateState(sagaInstance, possibleNewState);
            sagaInstance.setStateName(actions.getUpdatedState().get());
            this.sagaInstanceRepository.update(sagaInstance);
            this.maybePerformEndStateActions(sagaId, sagaInstance, possibleNewState);
            this.updateEnlistedAggregates(sagaId, actions.getEnlistedAggregates());
            sagaInstance.setLastRequestId(this.sendCommands(sagaId, commands));
            this.updateEventInstanceSubscriptions(sagaData, sagaId, sagaInstance.getStateName());
            sagaInstance.setSerializedSagaData(SagaDataSerde.serializeSagaData(sagaData));
            this.sagaInstanceRepository.update(sagaInstance);
        }
    }

    private Boolean isReplyForThisSagaType(Message message) {
        return message.getHeader(SagaReplyHeaders.REPLY_SAGA_TYPE).map(x -> x.equals(this.getSagaType())).orElse(false);
    }

    private DestinationAndResource toDestinationAndResource(CommandWithDestination commandToSend) {
        return new DestinationAndResource(commandToSend.getDestinationChannel(), commandToSend.getResource());
    }

    private void updateEnlistedAggregates(String sagaId, Set<EnlistedAggregate> enlistedAggregates) {
        this.enlistedAggregatesDao.save(sagaId, enlistedAggregates);
    }

    private void maybeUpdateState(SagaInstance sagaInstance, Optional<String> possibleNewState) {
        possibleNewState.ifPresent(sagaInstance::setStateName);
    }

    private void maybePerformEndStateActions(String sagaId, SagaInstance sagaInstance, Optional<String> possibleNewState) {
        possibleNewState.ifPresent(newState -> {
            if (this.getStateDefinition().isEndState((String)newState)) {
                this.performEndStateActions(sagaId, sagaInstance);
            }
        });
    }

    private boolean isDuplicateReply(String messageId, String sagaType, String sagaId) {
        String consumerId = this.makeConsumerIdFor(sagaType, sagaId);
        return false;
    }

    private String makeConsumerIdFor(String sagaType, String sagaId) {
        return "consumer-" + sagaType + "-" + sagaId;
    }
}

