package com.networknt.saga.orchestration;

import com.networknt.eventuate.common.impl.JSonMapper;
import com.networknt.eventuate.jdbc.IdGenerator;
import com.networknt.saga.common.LockTarget;
import com.networknt.saga.common.SagaCommandHeaders;
import com.networknt.saga.common.SagaReplyHeaders;
import com.networknt.saga.common.SagaUnlockCommand;
import com.networknt.saga.participant.SagaLockManager;
import com.networknt.service.SingletonServiceFactory;
import com.networknt.tram.command.common.ChannelMapping;
import com.networknt.tram.command.common.CommandMessageHeaders;
import com.networknt.tram.command.consumer.CommandWithDestination;
import com.networknt.tram.command.producer.CommandProducer;
import com.networknt.tram.event.common.DomainEvent;
import com.networknt.tram.event.publisher.DomainEventPublisher;
import com.networknt.tram.event.subscriber.DomainEventEnvelopeImpl;
import com.networknt.tram.message.common.Message;
import com.networknt.tram.message.consumer.MessageConsumer;
import com.networknt.tram.message.producer.MessageProducer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/saga/orchestration/SagaManagerImpl.class */
public class SagaManagerImpl<Data> implements SagaManager<Data> {
    public static final String DEFAULT_STATE_NAME = "default-name";
    private Saga<Data> saga;
    private Logger logger = LoggerFactory.getLogger(getClass());
    private SagaInstanceRepository sagaInstanceRepository = (SagaInstanceRepository) SingletonServiceFactory.getBean(SagaInstanceRepository.class);
    private MessageConsumer messageConsumer = (MessageConsumer) SingletonServiceFactory.getBean(MessageConsumer.class);
    private IdGenerator idGenerator = (IdGenerator) SingletonServiceFactory.getBean(IdGenerator.class);
    private AggregateInstanceSubscriptionsDAO aggregateInstanceSubscriptionsDAO = (AggregateInstanceSubscriptionsDAO) SingletonServiceFactory.getBean(AggregateInstanceSubscriptionsDAO.class);
    private EnlistedAggregatesDao enlistedAggregatesDao = (EnlistedAggregatesDao) SingletonServiceFactory.getBean(EnlistedAggregatesDao.class);
    private ChannelMapping channelMapping = (ChannelMapping) SingletonServiceFactory.getBean(ChannelMapping.class);
    private MessageProducer messageProducer = (MessageProducer) SingletonServiceFactory.getBean(MessageProducer.class);
    private CommandProducer commandProducer = (CommandProducer) SingletonServiceFactory.getBean(CommandProducer.class);
    private SagaLockManager sagaLockManager = (SagaLockManager) SingletonServiceFactory.getBean(SagaLockManager.class);
    private DomainEventPublisher domainEventPublisher = (DomainEventPublisher) SingletonServiceFactory.getBean(DomainEventPublisher.class);

    public SagaManagerImpl(Saga<Data> saga) {
        this.saga = saga;
        this.messageConsumer.subscribe(saga.getClass().getName() + "-consumer", Collections.singleton(this.channelMapping.transform(makeSagaReplyChannel())), this::handleMessage);
    }

    @Override // com.networknt.saga.orchestration.SagaManager
    public SagaInstance create(Data data) {
        return create(data, Optional.empty());
    }

    @Override // com.networknt.saga.orchestration.SagaManager
    public SagaInstance create(Data data, Class cls, Object obj) {
        return create(data, Optional.of(new LockTarget(cls, obj).getTarget()));
    }

    @Override // com.networknt.saga.orchestration.SagaManager
    public SagaInstance create(Data data, Optional<String> optional) {
        SagaInstance sagaInstance = new SagaInstance(getSagaType(), null, DEFAULT_STATE_NAME, null, SagaDataSerde.serializeSagaData(data), new HashSet());
        this.sagaInstanceRepository.save(sagaInstance);
        String id = sagaInstance.getId();
        if (optional.isPresent() && !this.sagaLockManager.claimLock(getSagaType(), id, optional.get())) {
            throw new IllegalArgumentException("Cannot claim lock for resource");
        }
        NewSagaActions apply = getStateDefinition().getStartingHandler().get().apply(data);
        List<CommandWithDestination> commands = apply.getCommands();
        Data orElse = apply.getUpdatedSagaData().orElse(data);
        sagaInstance.setLastRequestId(sendCommands(id, commands));
        sagaInstance.setSerializedSagaData(SagaDataSerde.serializeSagaData(orElse));
        publishEvents(id, apply.getEventsToPublish(), apply.getUpdatedState());
        Optional<String> updatedState = apply.getUpdatedState();
        maybeUpdateState(sagaInstance, updatedState);
        maybePerformEndStateActions(id, sagaInstance, updatedState);
        this.sagaInstanceRepository.update(sagaInstance);
        updateEnlistedAggregates(id, apply.getEnlistedAggregates());
        updateEventInstanceSubscriptions(orElse, id, sagaInstance.getStateName());
        return sagaInstance;
    }

    private void publishEvents(String str, Set<EventToPublish> set, Optional<String> optional) {
        Collections.emptySet();
        boolean isPresent = optional.filter(str2 -> {
            return getStateDefinition().isEndState(str2);
        }).isPresent();
        for (EventToPublish eventToPublish : set) {
            HashMap hashMap = new HashMap();
            if (isPresent) {
                Set<String> findSagas = this.enlistedAggregatesDao.findSagas(eventToPublish.getAggregateType(), eventToPublish.getAggregateId());
                findSagas.remove(str);
                hashMap.put("participating-saga-ids", (String) findSagas.stream().collect(Collectors.joining(",")));
            }
            this.domainEventPublisher.publish(eventToPublish.getAggregateType().getName(), eventToPublish.getAggregateId(), hashMap, eventToPublish.getDomainEvents());
        }
    }

    private void performEndStateActions(String str, SagaInstance sagaInstance) {
        for (DestinationAndResource destinationAndResource : sagaInstance.getDestinationsAndResources()) {
            HashMap hashMap = new HashMap();
            hashMap.put(SagaCommandHeaders.SAGA_ID, str);
            hashMap.put(SagaCommandHeaders.SAGA_TYPE, getSagaType());
            this.commandProducer.send(destinationAndResource.getDestination(), destinationAndResource.getResource(), new SagaUnlockCommand(), makeSagaReplyChannel(), hashMap);
        }
    }

    private SagaDefinition<Data> getStateDefinition() {
        SagaDefinition<Data> sagaDefinition = this.saga.getSagaDefinition();
        Objects.requireNonNull(sagaDefinition);
        return sagaDefinition;
    }

    private String getSagaType() {
        return this.saga.getSagaType();
    }

    @PostConstruct
    public void subscribeToReplyChannel() {
        this.messageConsumer.subscribe(this.saga.getClass().getName() + "-consumer", Collections.singleton(this.channelMapping.transform(makeSagaReplyChannel())), this::handleMessage);
    }

    private String makeSagaReplyChannel() {
        return getSagaType() + "-reply";
    }

    private void updateEventInstanceSubscriptions(Data data, String str, String str2) {
        this.aggregateInstanceSubscriptionsDAO.update(getSagaType(), str, getStateDefinition().findEventHandlers(this.saga, str2, data));
    }

    private String sendCommands(String str, List<CommandWithDestination> list) {
        String str2 = null;
        for (CommandWithDestination commandWithDestination : list) {
            str2 = this.idGenerator.genId().asString();
            HashMap hashMap = new HashMap();
            hashMap.put(SagaCommandHeaders.SAGA_TYPE, getSagaType());
            hashMap.put(SagaCommandHeaders.SAGA_ID, str);
            hashMap.put(SagaCommandHeaders.SAGA_REQUEST_ID, str2);
            this.commandProducer.send(commandWithDestination.getDestinationChannel(), commandWithDestination.getResource(), commandWithDestination.getCommand(), makeSagaReplyChannel(), hashMap);
        }
        return str2;
    }

    public void handleMessage(Message message) {
        this.logger.debug("handle message invoked {}", message);
        if (message.hasHeader(SagaReplyHeaders.REPLY_SAGA_ID)) {
            handleReply(message);
            return;
        }
        if (!message.hasHeader("event-type")) {
            this.logger.warn("Handle message doesn't know what to do with: {} ", message);
            return;
        }
        String requiredHeader = message.getRequiredHeader("event-aggregate-type");
        String requiredHeader2 = message.getRequiredHeader("PARTITION_ID");
        String requiredHeader3 = message.getRequiredHeader("event-type");
        for (SagaTypeAndId sagaTypeAndId : this.aggregateInstanceSubscriptionsDAO.findSagas(requiredHeader, requiredHeader2, requiredHeader3)) {
            handleAggregateInstanceEvent(sagaTypeAndId.getSagaType(), sagaTypeAndId.getSagaId(), message, requiredHeader, requiredHeader2, requiredHeader3);
        }
    }

    private void handleAggregateInstanceEvent(String str, String str2, Message message, String str3, String str4, String str5) {
        System.out.println("Got handleAggregateInstanceEvent: " + message + ", type=" + str + ", instance=" + str2);
        SagaInstanceData<Data> findWithData = this.sagaInstanceRepository.findWithData(str, str2);
        SagaInstance sagaInstance = findWithData.getSagaInstance();
        Data sagaData = findWithData.getSagaData();
        String stateName = sagaInstance.getStateName();
        this.logger.info("Current state={}", stateName);
        Optional<SagaEventHandler<Data>> findEventHandler = getStateDefinition().findEventHandler(this.saga, stateName, sagaData, str3, Long.parseLong(str4), str5);
        if (!findEventHandler.isPresent()) {
            this.logger.error("No event handler for: {}", message);
            return;
        }
        this.logger.info("Invoking event handler for {}", message);
        findEventHandler.get().getAction().apply(sagaData, new DomainEventEnvelopeImpl((Message) null, (String) null, (String) null, (String) null, (DomainEvent) null));
        sagaInstance.setSerializedSagaData(SagaDataSerde.serializeSagaData(sagaData));
        this.sagaInstanceRepository.update(sagaInstance);
    }

    private void handleReply(Message message) {
        if (isReplyForThisSagaType(message).booleanValue()) {
            this.logger.debug("Handle reply: {}", message);
            String requiredHeader = message.getRequiredHeader(SagaReplyHeaders.REPLY_SAGA_ID);
            String requiredHeader2 = message.getRequiredHeader(SagaReplyHeaders.REPLY_SAGA_TYPE);
            String requiredHeader3 = message.getRequiredHeader(SagaReplyHeaders.REPLY_SAGA_REQUEST_ID);
            if (isDuplicateReply(message.getId(), requiredHeader2, requiredHeader)) {
                return;
            }
            message.getRequiredHeader("reply_type");
            String payload = message.getPayload();
            SagaInstanceData<Data> findWithData = this.sagaInstanceRepository.findWithData(requiredHeader2, requiredHeader);
            SagaInstance sagaInstance = findWithData.getSagaInstance();
            Data sagaData = findWithData.getSagaData();
            message.getHeader(SagaReplyHeaders.REPLY_LOCKED).ifPresent(str -> {
                sagaInstance.addDestinationsAndResources(Collections.singleton(new DestinationAndResource(message.getRequiredHeader(CommandMessageHeaders.inReply("command__destination")), str)));
            });
            String stateName = sagaInstance.getStateName();
            this.logger.info("Current state={}", stateName);
            Optional<ReplyClassAndHandler> findReplyHandler = getStateDefinition().findReplyHandler(this.saga, sagaInstance, stateName, sagaData, requiredHeader3, message);
            if (!findReplyHandler.isPresent()) {
                this.logger.error("No handler for {}", message);
                return;
            }
            ReplyClassAndHandler replyClassAndHandler = findReplyHandler.get();
            SagaActions apply = replyClassAndHandler.getReplyHandler().apply(sagaData, JSonMapper.fromJson(payload, replyClassAndHandler.getReplyClass()));
            List<CommandWithDestination> commands = apply.getCommands();
            Data orElse = apply.getUpdatedSagaData().orElse(sagaData);
            this.logger.info("Handled reply. Sending commands {}", commands);
            publishEvents(requiredHeader, apply.getEventsToPublish(), apply.getUpdatedState());
            Optional<String> updatedState = apply.getUpdatedState();
            maybeUpdateState(sagaInstance, updatedState);
            maybePerformEndStateActions(requiredHeader, sagaInstance, updatedState);
            updateEnlistedAggregates(requiredHeader, apply.getEnlistedAggregates());
            sagaInstance.setLastRequestId(sendCommands(requiredHeader, commands));
            updateEventInstanceSubscriptions(orElse, requiredHeader, sagaInstance.getStateName());
            sagaInstance.setSerializedSagaData(SagaDataSerde.serializeSagaData(orElse));
            this.sagaInstanceRepository.update(sagaInstance);
        }
    }

    private Boolean isReplyForThisSagaType(Message message) {
        return (Boolean) message.getHeader(SagaReplyHeaders.REPLY_SAGA_TYPE).map(str -> {
            return Boolean.valueOf(str.equals(getSagaType()));
        }).orElse(false);
    }

    private DestinationAndResource toDestinationAndResource(CommandWithDestination commandWithDestination) {
        return new DestinationAndResource(commandWithDestination.getDestinationChannel(), commandWithDestination.getResource());
    }

    private void updateEnlistedAggregates(String str, Set<EnlistedAggregate> set) {
        this.enlistedAggregatesDao.save(str, set);
    }

    private void maybeUpdateState(SagaInstance sagaInstance, Optional<String> optional) {
        Objects.requireNonNull(sagaInstance);
        optional.ifPresent(sagaInstance::setStateName);
    }

    private void maybePerformEndStateActions(String str, SagaInstance sagaInstance, Optional<String> optional) {
        optional.ifPresent(str2 -> {
            if (getStateDefinition().isEndState(str2)) {
                performEndStateActions(str, sagaInstance);
            }
        });
    }

    private boolean isDuplicateReply(String str, String str2, String str3) {
        makeConsumerIdFor(str2, str3);
        return false;
    }

    private String makeConsumerIdFor(String str, String str2) {
        return "consumer-" + str + "-" + str2;
    }
}
