package org.eclipse.edc.connector.contract.negotiation;

import io.opentelemetry.extension.annotations.WithSpan;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Function;
import org.eclipse.edc.connector.contract.negotiation.AbstractContractNegotiationManager;
import org.eclipse.edc.connector.contract.spi.ContractId;
import org.eclipse.edc.connector.contract.spi.negotiation.ConsumerContractNegotiationManager;
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreementMessage;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreementVerificationMessage;
import org.eclipse.edc.connector.contract.spi.types.command.ContractNegotiationCommand;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationTerminationMessage;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequest;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequestData;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequestMessage;
import org.eclipse.edc.connector.contract.spi.types.offer.ContractOffer;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.persistence.StateEntityStore;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.statemachine.StateMachineManager;
import org.eclipse.edc.statemachine.StateProcessorImpl;
import org.eclipse.edc.statemachine.retry.CompletableFutureRetryProcess;

/* loaded from: input_file:org/eclipse/edc/connector/contract/negotiation/ConsumerContractNegotiationManagerImpl.class */
public class ConsumerContractNegotiationManagerImpl extends AbstractContractNegotiationManager implements ConsumerContractNegotiationManager {
    private StateMachineManager stateMachineManager;

    /* loaded from: input_file:org/eclipse/edc/connector/contract/negotiation/ConsumerContractNegotiationManagerImpl$Builder.class */
    public static class Builder extends AbstractContractNegotiationManager.Builder<ConsumerContractNegotiationManagerImpl> {
        private Builder() {
            super(new ConsumerContractNegotiationManagerImpl());
        }

        public static Builder newInstance() {
            return new Builder();
        }
    }

    private ConsumerContractNegotiationManagerImpl() {
    }

    public void start() {
        this.stateMachineManager = StateMachineManager.Builder.newInstance("consumer-contract-negotiation", this.monitor, this.executorInstrumentation, this.waitStrategy).processor(processNegotiationsInState(ContractNegotiationStates.INITIAL, this::processInitial)).processor(processNegotiationsInState(ContractNegotiationStates.REQUESTING, this::processRequesting)).processor(processNegotiationsInState(ContractNegotiationStates.ACCEPTING, this::processAccepting)).processor(processNegotiationsInState(ContractNegotiationStates.AGREED, this::processAgreed)).processor(processNegotiationsInState(ContractNegotiationStates.VERIFYING, this::processVerifying)).processor(processNegotiationsInState(ContractNegotiationStates.TERMINATING, this::processTerminating)).processor(onCommands(this::processCommand)).build();
        this.stateMachineManager.start();
    }

    public void stop() {
        if (this.stateMachineManager != null) {
            this.stateMachineManager.stop();
        }
    }

    @WithSpan
    public StatusResult<ContractNegotiation> initiate(ContractRequest contractRequest) {
        String uuid = UUID.randomUUID().toString();
        ContractRequestData requestData = contractRequest.getRequestData();
        ContractNegotiation build = ContractNegotiation.Builder.newInstance().id(uuid).correlationId(uuid).protocol(requestData.getProtocol()).counterPartyId(requestData.getConnectorId()).counterPartyAddress(requestData.getCounterPartyAddress()).callbackAddresses(contractRequest.getCallbackAddresses()).traceContext(this.telemetry.getCurrentTraceContext()).type(ContractNegotiation.Type.CONSUMER).build();
        build.addContractOffer(requestData.getContractOffer());
        transitionToInitial(build);
        return StatusResult.success(build);
    }

    public void enqueueCommand(ContractNegotiationCommand contractNegotiationCommand) {
        this.commandQueue.enqueue(contractNegotiationCommand);
    }

    @WithSpan
    private boolean processInitial(ContractNegotiation contractNegotiation) {
        transitionToRequesting(contractNegotiation);
        return true;
    }

    @WithSpan
    private boolean processRequesting(ContractNegotiation contractNegotiation) {
        ContractRequestMessage build = ContractRequestMessage.Builder.newInstance().contractOffer(contractNegotiation.getLastContractOffer()).counterPartyAddress(contractNegotiation.getCounterPartyAddress()).callbackAddress(this.protocolWebhook.url()).protocol(contractNegotiation.getProtocol()).connectorId(contractNegotiation.getCounterPartyId()).processId(contractNegotiation.getId()).type(ContractRequestMessage.Type.INITIAL).build();
        CompletableFutureRetryProcess doAsyncProcess = this.entityRetryProcessFactory.doAsyncProcess(contractNegotiation, () -> {
            return this.dispatcherRegistry.send(Object.class, build);
        });
        ContractNegotiationStore contractNegotiationStore = this.negotiationStore;
        Objects.requireNonNull(contractNegotiationStore);
        return doAsyncProcess.entityRetrieve(contractNegotiationStore::findById).onDelay(this::breakLease).onSuccess((contractNegotiation2, obj) -> {
            transitionToRequested(contractNegotiation2);
        }).onFailure((contractNegotiation3, th) -> {
            transitionToRequesting(contractNegotiation3);
        }).onRetryExhausted((contractNegotiation4, th2) -> {
            transitionToTerminating(contractNegotiation4, String.format("Failed to send %s to provider: %s", build.getClass().getSimpleName(), th2.getMessage()));
        }).execute("[Consumer] Send ContractRequestMessage message");
    }

    @WithSpan
    private boolean processAccepting(ContractNegotiation contractNegotiation) {
        ContractOffer lastContractOffer = contractNegotiation.getLastContractOffer();
        ContractId parse = ContractId.parse(lastContractOffer.getId());
        if (!parse.isValid()) {
            this.monitor.severe("ConsumerContractNegotiationManagerImpl.approveContractOffers(): Offer Id not correctly formatted.", new Throwable[0]);
            return false;
        }
        String definitionPart = parse.definitionPart();
        Policy policy = lastContractOffer.getPolicy();
        ContractAgreementMessage build = ContractAgreementMessage.Builder.newInstance().protocol(contractNegotiation.getProtocol()).connectorId(contractNegotiation.getCounterPartyId()).counterPartyAddress(contractNegotiation.getCounterPartyAddress()).contractAgreement(ContractAgreement.Builder.newInstance().id(ContractId.createContractId(definitionPart, lastContractOffer.getAssetId())).contractSigningDate(this.clock.instant().getEpochSecond()).providerId(contractNegotiation.getCounterPartyId()).consumerId(this.participantId).policy(policy).assetId(lastContractOffer.getAssetId()).build()).processId(contractNegotiation.getId()).policy(policy).build();
        CompletableFutureRetryProcess doAsyncProcess = this.entityRetryProcessFactory.doAsyncProcess(contractNegotiation, () -> {
            return this.dispatcherRegistry.send(Object.class, build);
        });
        ContractNegotiationStore contractNegotiationStore = this.negotiationStore;
        Objects.requireNonNull(contractNegotiationStore);
        return doAsyncProcess.entityRetrieve(contractNegotiationStore::findById).onDelay(this::breakLease).onSuccess((contractNegotiation2, obj) -> {
            transitionToAccepted(contractNegotiation2);
        }).onFailure((contractNegotiation3, th) -> {
            transitionToAccepting(contractNegotiation3);
        }).onRetryExhausted((contractNegotiation4, th2) -> {
            transitionToTerminating(contractNegotiation4, String.format("Failed to send %s to provider: %s", build.getClass().getSimpleName(), th2.getMessage()));
        }).execute("[consumer] send agreement");
    }

    @WithSpan
    private boolean processAgreed(ContractNegotiation contractNegotiation) {
        if ("ids-multipart".equals(contractNegotiation.getProtocol())) {
            transitionToFinalized(contractNegotiation);
            return true;
        }
        transitionToVerifying(contractNegotiation);
        return true;
    }

    @WithSpan
    private boolean processVerifying(ContractNegotiation contractNegotiation) {
        ContractAgreementVerificationMessage build = ContractAgreementVerificationMessage.Builder.newInstance().protocol(contractNegotiation.getProtocol()).counterPartyAddress(contractNegotiation.getCounterPartyAddress()).processId(contractNegotiation.getId()).build();
        CompletableFutureRetryProcess doAsyncProcess = this.entityRetryProcessFactory.doAsyncProcess(contractNegotiation, () -> {
            return this.dispatcherRegistry.send(Object.class, build);
        });
        ContractNegotiationStore contractNegotiationStore = this.negotiationStore;
        Objects.requireNonNull(contractNegotiationStore);
        return doAsyncProcess.entityRetrieve(contractNegotiationStore::findById).onDelay(this::breakLease).onSuccess((contractNegotiation2, obj) -> {
            transitionToVerified(contractNegotiation2);
        }).onFailure((contractNegotiation3, th) -> {
            transitionToVerifying(contractNegotiation3);
        }).onRetryExhausted((contractNegotiation4, th2) -> {
            transitionToTerminating(contractNegotiation4, String.format("Failed to send %s to provider: %s", build.getClass().getSimpleName(), th2.getMessage()));
        }).execute(String.format("[consumer] send %s", build.getClass().getSimpleName()));
    }

    @WithSpan
    private boolean processTerminating(ContractNegotiation contractNegotiation) {
        ContractNegotiationTerminationMessage build = ContractNegotiationTerminationMessage.Builder.newInstance().protocol(contractNegotiation.getProtocol()).connectorId(contractNegotiation.getCounterPartyId()).counterPartyAddress(contractNegotiation.getCounterPartyAddress()).processId(contractNegotiation.getId()).rejectionReason(contractNegotiation.getErrorDetail()).build();
        CompletableFutureRetryProcess doAsyncProcess = this.entityRetryProcessFactory.doAsyncProcess(contractNegotiation, () -> {
            return this.dispatcherRegistry.send(Object.class, build);
        });
        ContractNegotiationStore contractNegotiationStore = this.negotiationStore;
        Objects.requireNonNull(contractNegotiationStore);
        return doAsyncProcess.entityRetrieve(contractNegotiationStore::findById).onDelay(this::breakLease).onSuccess((contractNegotiation2, obj) -> {
            transitionToTerminated(contractNegotiation2);
        }).onFailure((contractNegotiation3, th) -> {
            transitionToTerminating(contractNegotiation3);
        }).onRetryExhausted((contractNegotiation4, th2) -> {
            transitionToTerminated(contractNegotiation4, String.format("Failed to send %s to provider: %s", build.getClass().getSimpleName(), th2.getMessage()));
        }).execute("[Consumer] send rejection");
    }

    private StateProcessorImpl<ContractNegotiation> processNegotiationsInState(ContractNegotiationStates contractNegotiationStates, Function<ContractNegotiation, Boolean> function) {
        Criterion[] criterionArr = {StateEntityStore.hasState(contractNegotiationStates.code()), new Criterion("type", "=", ContractNegotiation.Type.CONSUMER.name())};
        return new StateProcessorImpl<>(() -> {
            return this.negotiationStore.nextNotLeased(this.batchSize, criterionArr);
        }, this.telemetry.contextPropagationMiddleware(function));
    }

    private StateProcessorImpl<ContractNegotiationCommand> onCommands(Function<ContractNegotiationCommand, Boolean> function) {
        return new StateProcessorImpl<>(() -> {
            return this.commandQueue.dequeue(5);
        }, function);
    }

    private boolean processCommand(ContractNegotiationCommand contractNegotiationCommand) {
        return this.commandProcessor.processCommandQueue(contractNegotiationCommand);
    }
}
