package org.eclipse.edc.connector.contract.negotiation;

import io.opentelemetry.extension.annotations.WithSpan;
import java.util.Objects;
import java.util.function.Function;
import org.eclipse.edc.connector.contract.negotiation.AbstractContractNegotiationManager;
import org.eclipse.edc.connector.contract.spi.ContractId;
import org.eclipse.edc.connector.contract.spi.negotiation.ProviderContractNegotiationManager;
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreementMessage;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractNegotiationEventMessage;
import org.eclipse.edc.connector.contract.spi.types.command.ContractNegotiationCommand;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationTerminationMessage;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequestMessage;
import org.eclipse.edc.connector.contract.spi.types.offer.ContractOffer;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.persistence.StateEntityStore;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.statemachine.StateMachineManager;
import org.eclipse.edc.statemachine.StateProcessorImpl;
import org.eclipse.edc.statemachine.retry.CompletableFutureRetryProcess;

/* loaded from: input_file:org/eclipse/edc/connector/contract/negotiation/ProviderContractNegotiationManagerImpl.class */
public class ProviderContractNegotiationManagerImpl extends AbstractContractNegotiationManager implements ProviderContractNegotiationManager {
    private StateMachineManager stateMachineManager;

    /* loaded from: input_file:org/eclipse/edc/connector/contract/negotiation/ProviderContractNegotiationManagerImpl$Builder.class */
    public static class Builder extends AbstractContractNegotiationManager.Builder<ProviderContractNegotiationManagerImpl> {
        private Builder() {
            super(new ProviderContractNegotiationManagerImpl());
        }

        public static Builder newInstance() {
            return new Builder();
        }
    }

    private ProviderContractNegotiationManagerImpl() {
    }

    public void start() {
        this.stateMachineManager = StateMachineManager.Builder.newInstance("provider-contract-negotiation", this.monitor, this.executorInstrumentation, this.waitStrategy).processor(processNegotiationsInState(ContractNegotiationStates.OFFERING, this::processOffering)).processor(processNegotiationsInState(ContractNegotiationStates.REQUESTED, this::processRequested)).processor(processNegotiationsInState(ContractNegotiationStates.AGREEING, this::processAgreeing)).processor(processNegotiationsInState(ContractNegotiationStates.VERIFIED, this::processVerified)).processor(processNegotiationsInState(ContractNegotiationStates.FINALIZING, this::processFinalizing)).processor(processNegotiationsInState(ContractNegotiationStates.TERMINATING, this::processTerminating)).processor(onCommands(this::processCommand)).build();
        this.stateMachineManager.start();
    }

    public void stop() {
        if (this.stateMachineManager != null) {
            this.stateMachineManager.stop();
        }
    }

    public void enqueueCommand(ContractNegotiationCommand contractNegotiationCommand) {
        this.commandQueue.enqueue(contractNegotiationCommand);
    }

    private StateProcessorImpl<ContractNegotiation> processNegotiationsInState(ContractNegotiationStates contractNegotiationStates, Function<ContractNegotiation, Boolean> function) {
        Criterion[] criterionArr = {StateEntityStore.hasState(contractNegotiationStates.code()), new Criterion("type", "=", ContractNegotiation.Type.PROVIDER.name())};
        return new StateProcessorImpl<>(() -> {
            return this.negotiationStore.nextNotLeased(this.batchSize, criterionArr);
        }, this.telemetry.contextPropagationMiddleware(function));
    }

    private StateProcessorImpl<ContractNegotiationCommand> onCommands(Function<ContractNegotiationCommand, Boolean> function) {
        return new StateProcessorImpl<>(() -> {
            return this.commandQueue.dequeue(5);
        }, function);
    }

    private boolean processCommand(ContractNegotiationCommand contractNegotiationCommand) {
        return this.commandProcessor.processCommandQueue(contractNegotiationCommand);
    }

    @WithSpan
    private boolean processOffering(ContractNegotiation contractNegotiation) {
        ContractRequestMessage build = ContractRequestMessage.Builder.newInstance().protocol(contractNegotiation.getProtocol()).connectorId(contractNegotiation.getCounterPartyId()).counterPartyAddress(contractNegotiation.getCounterPartyAddress()).contractOffer(contractNegotiation.getLastContractOffer()).processId(contractNegotiation.getCorrelationId()).build();
        CompletableFutureRetryProcess doAsyncProcess = this.entityRetryProcessFactory.doAsyncProcess(contractNegotiation, () -> {
            return this.dispatcherRegistry.send(Object.class, build);
        });
        ContractNegotiationStore contractNegotiationStore = this.negotiationStore;
        Objects.requireNonNull(contractNegotiationStore);
        return doAsyncProcess.entityRetrieve(contractNegotiationStore::findById).onDelay(this::breakLease).onSuccess((contractNegotiation2, obj) -> {
            transitionToOffered(contractNegotiation2);
        }).onFailure((contractNegotiation3, th) -> {
            transitionToOffering(contractNegotiation3);
        }).onRetryExhausted((contractNegotiation4, th2) -> {
            transitionToTerminating(contractNegotiation4, String.format("Failed to send %s to consumer: %s", build.getClass().getSimpleName(), th2.getMessage()));
        }).execute("[Provider] send counter offer");
    }

    @WithSpan
    private boolean processTerminating(ContractNegotiation contractNegotiation) {
        ContractNegotiationTerminationMessage build = ContractNegotiationTerminationMessage.Builder.newInstance().protocol(contractNegotiation.getProtocol()).connectorId(contractNegotiation.getCounterPartyId()).counterPartyAddress(contractNegotiation.getCounterPartyAddress()).processId(contractNegotiation.getCorrelationId()).rejectionReason(contractNegotiation.getErrorDetail()).build();
        CompletableFutureRetryProcess doAsyncProcess = this.entityRetryProcessFactory.doAsyncProcess(contractNegotiation, () -> {
            return this.dispatcherRegistry.send(Object.class, build);
        });
        ContractNegotiationStore contractNegotiationStore = this.negotiationStore;
        Objects.requireNonNull(contractNegotiationStore);
        return doAsyncProcess.entityRetrieve(contractNegotiationStore::findById).onDelay(this::breakLease).onSuccess((contractNegotiation2, obj) -> {
            transitionToTerminated(contractNegotiation2);
        }).onFailure((contractNegotiation3, th) -> {
            transitionToTerminating(contractNegotiation3);
        }).onRetryExhausted((contractNegotiation4, th2) -> {
            transitionToTerminated(contractNegotiation4, String.format("Failed to send %s to consumer: %s", build.getClass().getSimpleName(), th2.getMessage()));
        }).execute("[Provider] send rejection");
    }

    @WithSpan
    private boolean processRequested(ContractNegotiation contractNegotiation) {
        transitionToAgreeing(contractNegotiation);
        return true;
    }

    @WithSpan
    private boolean processAgreeing(ContractNegotiation contractNegotiation) {
        ContractAgreement contractAgreement;
        Policy policy;
        ContractAgreement contractAgreement2 = contractNegotiation.getContractAgreement();
        if (contractAgreement2 == null) {
            ContractOffer lastContractOffer = contractNegotiation.getLastContractOffer();
            ContractId parse = ContractId.parse(lastContractOffer.getId());
            if (!parse.isValid()) {
                this.monitor.severe("ProviderContractNegotiationManagerImpl.checkConfirming(): Offer Id not correctly formatted.", new Throwable[0]);
                return false;
            }
            String definitionPart = parse.definitionPart();
            policy = lastContractOffer.getPolicy();
            contractAgreement = ContractAgreement.Builder.newInstance().id(ContractId.createContractId(definitionPart, lastContractOffer.getAssetId())).contractSigningDate(this.clock.instant().getEpochSecond()).providerId(this.participantId).consumerId(contractNegotiation.getCounterPartyId()).policy(policy).assetId(lastContractOffer.getAssetId()).build();
        } else {
            contractAgreement = contractAgreement2;
            policy = contractAgreement.getPolicy();
        }
        ContractAgreementMessage build = ContractAgreementMessage.Builder.newInstance().protocol(contractNegotiation.getProtocol()).connectorId(contractNegotiation.getCounterPartyId()).counterPartyAddress(contractNegotiation.getCounterPartyAddress()).contractAgreement(contractAgreement).processId(contractNegotiation.getCorrelationId()).policy(policy).build();
        CompletableFutureRetryProcess doAsyncProcess = this.entityRetryProcessFactory.doAsyncProcess(contractNegotiation, () -> {
            return this.dispatcherRegistry.send(Object.class, build);
        });
        ContractNegotiationStore contractNegotiationStore = this.negotiationStore;
        Objects.requireNonNull(contractNegotiationStore);
        ContractAgreement contractAgreement3 = contractAgreement;
        return doAsyncProcess.entityRetrieve(contractNegotiationStore::findById).onDelay(this::breakLease).onSuccess((contractNegotiation2, obj) -> {
            transitionToAgreed(contractNegotiation2, contractAgreement3);
        }).onFailure((contractNegotiation3, th) -> {
            transitionToAgreeing(contractNegotiation3);
        }).onRetryExhausted((contractNegotiation4, th2) -> {
            transitionToTerminating(contractNegotiation4, String.format("Failed to send %s to consumer: %s", build.getClass().getSimpleName(), th2.getMessage()));
        }).execute("[Provider] send agreement");
    }

    @WithSpan
    private boolean processVerified(ContractNegotiation contractNegotiation) {
        transitionToFinalizing(contractNegotiation);
        return true;
    }

    @WithSpan
    private boolean processFinalizing(ContractNegotiation contractNegotiation) {
        ContractNegotiationEventMessage build = ContractNegotiationEventMessage.Builder.newInstance().type(ContractNegotiationEventMessage.Type.FINALIZED).protocol(contractNegotiation.getProtocol()).counterPartyAddress(contractNegotiation.getCounterPartyAddress()).processId(contractNegotiation.getCorrelationId()).build();
        CompletableFutureRetryProcess doAsyncProcess = this.entityRetryProcessFactory.doAsyncProcess(contractNegotiation, () -> {
            return this.dispatcherRegistry.send(Object.class, build);
        });
        ContractNegotiationStore contractNegotiationStore = this.negotiationStore;
        Objects.requireNonNull(contractNegotiationStore);
        return doAsyncProcess.entityRetrieve(contractNegotiationStore::findById).onDelay(this::breakLease).onSuccess((contractNegotiation2, obj) -> {
            transitionToFinalized(contractNegotiation2);
        }).onFailure((contractNegotiation3, th) -> {
            transitionToFinalizing(contractNegotiation3);
        }).onRetryExhausted((contractNegotiation4, th2) -> {
            transitionToTerminating(contractNegotiation4, String.format("Failed to send %s to consumer: %s", build.getClass().getSimpleName(), th2.getMessage()));
        }).execute("[Provider] send finalization");
    }
}
