package org.cardanofoundation.hydra.reactor;

import com.fasterxml.jackson.databind.JsonNode;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import org.cardanofoundation.hydra.client.HydraClientOptions;
import org.cardanofoundation.hydra.client.HydraQueryEventListener;
import org.cardanofoundation.hydra.client.HydraWSClient;
import org.cardanofoundation.hydra.core.HydraException;
import org.cardanofoundation.hydra.core.model.HydraState;
import org.cardanofoundation.hydra.core.model.Request;
import org.cardanofoundation.hydra.core.model.Tag;
import org.cardanofoundation.hydra.core.model.query.response.CommittedResponse;
import org.cardanofoundation.hydra.core.model.query.response.GetUTxOResponse;
import org.cardanofoundation.hydra.core.model.query.response.GreetingsResponse;
import org.cardanofoundation.hydra.core.model.query.response.HeadIsAbortedResponse;
import org.cardanofoundation.hydra.core.model.query.response.HeadIsClosedResponse;
import org.cardanofoundation.hydra.core.model.query.response.HeadIsFinalizedResponse;
import org.cardanofoundation.hydra.core.model.query.response.HeadIsInitializingResponse;
import org.cardanofoundation.hydra.core.model.query.response.HeadIsOpenResponse;
import org.cardanofoundation.hydra.core.model.query.response.PostTxOnChainFailedResponse;
import org.cardanofoundation.hydra.core.model.query.response.Response;
import org.cardanofoundation.hydra.core.model.query.response.SnapshotConfirmed;
import org.cardanofoundation.hydra.core.model.query.response.TxInvalidResponse;
import org.cardanofoundation.hydra.core.model.query.response.TxValidResponse;
import org.cardanofoundation.hydra.core.store.UTxOStore;
import org.cardanofoundation.hydra.core.utils.HexUtils;
import org.cardanofoundation.hydra.reactor.commands.AbortHeadCommand;
import org.cardanofoundation.hydra.reactor.commands.CloseHeadCommand;
import org.cardanofoundation.hydra.reactor.commands.CommittedCommand;
import org.cardanofoundation.hydra.reactor.commands.ConnectCommand;
import org.cardanofoundation.hydra.reactor.commands.FanOutHeadCommand;
import org.cardanofoundation.hydra.reactor.commands.GetUTxOCommand;
import org.cardanofoundation.hydra.reactor.commands.InitHeadCommand;
import org.cardanofoundation.hydra.reactor.commands.TxSubmitGlobalCommand;
import org.cardanofoundation.hydra.reactor.commands.TxSubmitLocalCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.annotation.Nullable;

/* loaded from: input_file:org/cardanofoundation/hydra/reactor/HydraReactiveClient.class */
public class HydraReactiveClient extends HydraQueryEventListener.Stub {
    private static final Logger log;
    private static final Duration DEF_TIMEOUT_DURATION;

    @Nullable
    private HydraWSClient hydraWSClient;
    private final HydraClientOptions hydraClientOptions;
    private final Duration timeout;
    private Map<String, MonoSink> monoSinkMap;
    static final /* synthetic */ boolean $assertionsDisabled;

    public HydraReactiveClient(UTxOStore uTxOStore, String str) {
        this(uTxOStore, str, DEF_TIMEOUT_DURATION);
    }

    public HydraReactiveClient(UTxOStore uTxOStore, String str, Duration duration) {
        this.monoSinkMap = new ConcurrentHashMap();
        this.timeout = duration;
        this.hydraClientOptions = HydraClientOptions.builder(str).utxoStore(uTxOStore).transactionFormat(HydraClientOptions.TransactionFormat.JSON).history(false).snapshotUtxo(true).build();
    }

    private void initWSClient() {
        if (this.hydraWSClient == null) {
            this.hydraWSClient = new HydraWSClient(this.hydraClientOptions);
            this.hydraWSClient.addHydraQueryEventListener(this);
            this.monoSinkMap = new ConcurrentHashMap();
        }
    }

    public Flux<BiHydraState> getHydraStatesStream() {
        if (this.hydraWSClient == null) {
            return Flux.empty();
        }
        FluxSinkHydraStateAdapter fluxSinkHydraStateAdapter = new FluxSinkHydraStateAdapter();
        return Flux.create(fluxSink -> {
            fluxSinkHydraStateAdapter.setSink(fluxSink);
            this.hydraWSClient.addHydraStateEventListener(fluxSinkHydraStateAdapter);
        }).doFinally(signalType -> {
            log.debug("Removing hydra state event listener...");
            this.hydraWSClient.removeHydraStateEventListener(fluxSinkHydraStateAdapter);
        });
    }

    public Flux<Response> getHydraResponsesStream() {
        if (this.hydraWSClient == null) {
            return Flux.empty();
        }
        FluxSinkResponseAdapter fluxSinkResponseAdapter = new FluxSinkResponseAdapter();
        return Flux.create(fluxSink -> {
            fluxSinkResponseAdapter.setSink(fluxSink);
            this.hydraWSClient.addHydraQueryEventListener(fluxSinkResponseAdapter);
        }).doFinally(signalType -> {
            log.debug("Removing hydra query event listener...");
            this.hydraWSClient.removeHydraQueryEventListener(fluxSinkResponseAdapter);
        });
    }

    private void destroyWSClient() {
        if (this.hydraWSClient != null) {
            this.hydraWSClient.clearAllHydraQueryEventListeners();
            this.hydraWSClient.clearAllHydraStateEventListeners();
        }
        this.monoSinkMap = new ConcurrentHashMap();
        this.hydraWSClient = null;
    }

    public void onResponse(Response response) {
        log.debug("Tag:{}, seq:{}", response.getTag(), Integer.valueOf(response.getSeq()));
        log.debug("monoSinkMap current size: {}", Integer.valueOf(this.monoSinkMap.size()));
        if (response instanceof GreetingsResponse) {
            GreetingsResponse greetingsResponse = (GreetingsResponse) response;
            this.hydraClientOptions.getUtxoStore().storeLatestUtxO(greetingsResponse.getSnapshotUtxo());
            applyMonoSuccess(ConnectCommand.key(), greetingsResponse);
        }
        if (response instanceof HeadIsOpenResponse) {
            this.hydraClientOptions.getUtxoStore().storeLatestUtxO(((HeadIsOpenResponse) response).getUtxo());
        }
        if (response instanceof CommittedResponse) {
            CommittedResponse committedResponse = (CommittedResponse) response;
            this.hydraClientOptions.getUtxoStore().storeLatestUtxO(committedResponse.getUtxo());
            applyMonoSuccess(CommittedCommand.key(), committedResponse);
        }
        if (response instanceof HeadIsClosedResponse) {
            applyMonoSuccess(CloseHeadCommand.key(), (HeadIsClosedResponse) response);
        }
        if (response instanceof SnapshotConfirmed) {
            SnapshotConfirmed snapshotConfirmed = (SnapshotConfirmed) response;
            this.hydraClientOptions.getUtxoStore().storeLatestUtxO(snapshotConfirmed.getSnapshot().getUtxo());
            for (String str : snapshotConfirmed.getSnapshot().getConfirmedTransactions()) {
                applyMonoSuccess(TxSubmitGlobalCommand.of(str).key(), new TxConfirmedResult(str));
            }
        }
        if (response instanceof HeadIsInitializingResponse) {
            applyMonoSuccess(InitHeadCommand.key(), (HeadIsInitializingResponse) response);
        }
        if (response instanceof HeadIsAbortedResponse) {
            applyMonoSuccess(AbortHeadCommand.key(), (HeadIsAbortedResponse) response);
        }
        if (response instanceof HeadIsFinalizedResponse) {
            applyMonoSuccess(FanOutHeadCommand.key(), (HeadIsFinalizedResponse) response);
        }
        if ((response instanceof PostTxOnChainFailedResponse) && ((PostTxOnChainFailedResponse) response).getPostChainTx().getTag() == Tag.FanoutTx) {
            applyMonoError(FanOutHeadCommand.key(), "Fanout failed.");
        }
        if (response instanceof TxValidResponse) {
            JsonNode transaction = ((TxValidResponse) response).getTransaction();
            String asText = transaction.get("id").asText();
            applyMonoSuccess(TxSubmitLocalCommand.of(asText).key(), new TxResult(asText, transaction.get("isValid").asBoolean()));
        }
        if (response instanceof TxInvalidResponse) {
            TxInvalidResponse txInvalidResponse = (TxInvalidResponse) response;
            JsonNode transaction2 = txInvalidResponse.getTransaction();
            String asText2 = transaction2.get("id").asText();
            applyMonoSuccess(TxSubmitLocalCommand.of(asText2).key(), new TxResult(asText2, transaction2.get("isValid").asBoolean(), txInvalidResponse.getValidationError().getReason()));
            applyMonoError(TxSubmitGlobalCommand.of(asText2).key(), String.format("TransactionId: %s is invalid, reason: %s", asText2, txInvalidResponse.getValidationError().getReason()));
        }
        if (response instanceof GetUTxOResponse) {
            applyMonoSuccess(GetUTxOCommand.key(), (GetUTxOResponse) response);
        }
    }

    public HydraState getHydraState() {
        return this.hydraWSClient == null ? HydraState.Unknown : this.hydraWSClient.getHydraState();
    }

    public Mono<GetUTxOResponse> getUTxOs() {
        if (this.hydraWSClient == null) {
            return Mono.empty();
        }
        if (this.hydraWSClient.getHydraState() != HydraState.Open) {
            log.warn("Hydra head is not open yet!");
            return Mono.empty();
        }
        String key = GetUTxOCommand.key();
        if (!getMonoSink(key).isPresent()) {
            return Mono.create(monoSink -> {
                storeMonoSinkReference(key, monoSink);
                this.hydraWSClient.getUTXO();
            }).timeout(this.timeout, Mono.defer(() -> {
                applyMonoCleanup(key);
                return Mono.error(new TimeoutException("getUTxOs rquest timeout!"));
            }));
        }
        log.warn("getUTxO request already in progress...");
        return Mono.empty();
    }

    public Mono<TxResult> submitTx(String str, byte[] bArr) {
        if (this.hydraWSClient == null) {
            return Mono.empty();
        }
        if (this.hydraWSClient.getHydraState() != HydraState.Open) {
            log.warn("Hydra head is not open yet!");
            return Mono.empty();
        }
        String key = TxSubmitLocalCommand.of(str).key();
        if (!getMonoSink(key).isPresent()) {
            return Mono.create(monoSink -> {
                storeMonoSinkReference(key, monoSink);
                this.hydraWSClient.submitTx(HexUtils.encodeHexString(bArr));
            }).timeout(this.timeout, Mono.defer(() -> {
                applyMonoCleanup(key);
                return Mono.error(new TimeoutException("TxLocalSubmit timeout, txId: " + str));
            }));
        }
        log.warn("tx submit request already in progress...");
        return Mono.empty();
    }

    public Mono<TxResult> submitTxFullConfirmation(String str, byte[] bArr) {
        if (this.hydraWSClient == null) {
            return Mono.empty();
        }
        if (this.hydraWSClient.getHydraState() != HydraState.Open) {
            log.warn("Hydra head is not open yet!");
            return Mono.empty();
        }
        String key = TxSubmitGlobalCommand.of(str).key();
        Mono<TxResult> submitTx = submitTx(str, bArr);
        if (!getMonoSink(key).isPresent()) {
            return submitTx.flatMap(txResult -> {
                return Mono.create(monoSink -> {
                    storeMonoSinkReference(key, monoSink);
                }).map(txConfirmedResult -> {
                    return new TxResult(txConfirmedResult.txId(), txResult.isValid(), txResult.getReason());
                });
            }).timeout(this.timeout, Mono.defer(() -> {
                applyMonoCleanup(key);
                return Mono.error(new TimeoutException("TxGlobalSubmit timeout, txId: " + str));
            }));
        }
        log.warn("tx submit request already in progress...");
        return Mono.empty();
    }

    protected void storeMonoSinkReference(String str, MonoSink monoSink) {
        this.monoSinkMap.put(str, monoSink);
    }

    protected Optional<MonoSink> getMonoSink(String str) {
        return Optional.ofNullable(this.monoSinkMap.get(str));
    }

    protected <T extends Request> void applyMonoSuccess(String str, Object obj) {
        MonoSink remove = this.monoSinkMap.remove(str);
        if (remove == null) {
            return;
        }
        remove.success(obj);
    }

    protected <T extends Request> void applyMonoSuccess(String str) {
        MonoSink remove = this.monoSinkMap.remove(str);
        if (remove == null) {
            return;
        }
        remove.success();
    }

    protected <T extends Request> void applyMonoError(String str, Object obj) {
        MonoSink remove = this.monoSinkMap.remove(str);
        if (remove == null) {
            return;
        }
        remove.error(new HydraException(String.valueOf(obj)));
    }

    protected void applyMonoCleanup(String str) {
        this.monoSinkMap.remove(str);
    }

    public Mono<GreetingsResponse> openConnection() {
        initWSClient();
        if (!$assertionsDisabled && this.hydraWSClient == null) {
            throw new AssertionError();
        }
        if (this.hydraWSClient.isOpen()) {
            log.warn("Connection already open!");
            return Mono.empty();
        }
        String key = ConnectCommand.key();
        if (!getMonoSink(key).isPresent()) {
            return Mono.create(monoSink -> {
                storeMonoSinkReference(key, monoSink);
                this.hydraWSClient.connect();
            }).timeout(this.timeout, Mono.defer(() -> {
                applyMonoCleanup(key);
                return Mono.error(new TimeoutException("head connect timeout!"));
            }));
        }
        log.warn("connect request already in progress...");
        return Mono.empty();
    }

    public Mono<Boolean> closeConnection() throws InterruptedException {
        if (this.hydraWSClient == null) {
            return Mono.empty();
        }
        this.hydraWSClient.closeBlocking();
        destroyWSClient();
        return Mono.just(true);
    }

    public Mono<HeadIsAbortedResponse> abortHead() {
        if (this.hydraWSClient == null) {
            return Mono.empty();
        }
        if (this.hydraWSClient.getHydraState() != HydraState.Initializing) {
            log.warn("Hydra not in initializing state...");
            return Mono.empty();
        }
        String key = AbortHeadCommand.key();
        if (!getMonoSink(key).isPresent()) {
            return Mono.create(monoSink -> {
                storeMonoSinkReference(key, monoSink);
                this.hydraWSClient.abort();
            }).timeout(this.timeout, Mono.defer(() -> {
                applyMonoCleanup(key);
                return Mono.error(new TimeoutException("abortHead timeout!"));
            }));
        }
        log.warn("abortHead request already in progress...");
        return Mono.empty();
    }

    public Mono<HeadIsInitializingResponse> initHead() {
        if (this.hydraWSClient == null) {
            return Mono.empty();
        }
        if (this.hydraWSClient.getHydraState() != HydraState.Idle && this.hydraWSClient.getHydraState() != HydraState.Final) {
            log.warn("Hydra needs to be either in Idle or Final state!");
            return Mono.empty();
        }
        String key = InitHeadCommand.key();
        if (!getMonoSink(key).isPresent()) {
            return Mono.create(monoSink -> {
                storeMonoSinkReference(key, monoSink);
                this.hydraWSClient.init();
            }).timeout(this.timeout, Mono.defer(() -> {
                applyMonoCleanup(key);
                return Mono.error(new TimeoutException("initHead timeout"));
            }));
        }
        log.warn("init head request already in progress!");
        return Mono.empty();
    }

    public Mono<HeadIsFinalizedResponse> fanOutHead() {
        if (this.hydraWSClient == null) {
            return Mono.empty();
        }
        if (this.hydraWSClient.getHydraState() != HydraState.FanoutPossible) {
            log.warn("Hydra needs to be in FanoutPossible state!");
            return Mono.empty();
        }
        String key = FanOutHeadCommand.key();
        if (!getMonoSink(key).isPresent()) {
            return Mono.create(monoSink -> {
                storeMonoSinkReference(key, monoSink);
                this.hydraWSClient.fanOut();
            }).timeout(this.timeout, Mono.defer(() -> {
                applyMonoCleanup(key);
                return Mono.error(new TimeoutException("commit funds timeout!"));
            }));
        }
        log.warn("fanOutHead request already in progress!");
        return Mono.empty();
    }

    public Mono<HeadIsClosedResponse> closeHead() {
        if (this.hydraWSClient == null) {
            return Mono.empty();
        }
        if (this.hydraWSClient.getHydraState() != HydraState.Open) {
            log.warn("Hydra needs to be in the Open state!");
            return Mono.empty();
        }
        String key = CloseHeadCommand.key();
        if (!getMonoSink(key).isPresent()) {
            return Mono.create(monoSink -> {
                storeMonoSinkReference(key, monoSink);
                this.hydraWSClient.closeHead();
            }).timeout(this.timeout, Mono.defer(() -> {
                applyMonoCleanup(key);
                return Mono.error(new TimeoutException("closeHead timeout!"));
            }));
        }
        log.warn("closeHead request already in progress!");
        return Mono.empty();
    }

    static {
        $assertionsDisabled = !HydraReactiveClient.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(HydraReactiveClient.class);
        DEF_TIMEOUT_DURATION = Duration.ofMinutes(1L);
    }
}
