package com.daml.ledger.rxjava.components;

import com.daml.ledger.javaapi.data.CompletionStreamResponse;
import com.daml.ledger.javaapi.data.LedgerOffset;
import com.daml.ledger.javaapi.data.SubmitCommandsRequest;
import com.daml.ledger.javaapi.data.TransactionFilter;
import com.daml.ledger.javaapi.data.WorkflowEvent;
import com.daml.ledger.rxjava.CommandSubmissionClient;
import com.daml.ledger.rxjava.LedgerClient;
import com.daml.ledger.rxjava.TransactionsClient;
import com.daml.ledger.rxjava.components.LedgerViewFlowable;
import com.daml.ledger.rxjava.components.helpers.CommandsAndPendingSet;
import com.daml.ledger.rxjava.components.helpers.CreatedContract;
import com.daml.ledger.rxjava.components.helpers.Pair;
import com.daml.ledger.rxjava.util.FlowableLogger;
import com.google.rpc.Code;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.MaybeSource;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.ReplaySubject;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/daml/ledger/rxjava/components/Bot.class */
public class Bot {
    private static final Logger logger = LoggerFactory.getLogger(Bot.class);

    static Flowable<LedgerViewFlowable.CompletionFailure> failuresCommandIds(Set<String> set, Flowable<CompletionStreamResponse> flowable) {
        return flowable.concatMapIterable((v0) -> {
            return v0.getCompletions();
        }).filter(completion -> {
            return completion.getStatus().getCode() != Code.OK.getNumber();
        }).map(completion2 -> {
            return new LedgerViewFlowable.CompletionFailure(completion2.getCommandId(), completion2.getStatus());
        });
    }

    public static <R> void wire(String str, LedgerClient ledgerClient, TransactionFilter transactionFilter, Function<LedgerViewFlowable.LedgerView<R>, Flowable<CommandsAndPendingSet>> function, Function<CreatedContract, R> function2) {
        wire(str, ledgerClient, transactionFilter, function, function2, Schedulers.io());
    }

    public static <R> void wire(String str, LedgerClient ledgerClient, TransactionFilter transactionFilter, Function<LedgerViewFlowable.LedgerView<R>, Flowable<CommandsAndPendingSet>> function, Function<CreatedContract, R> function2, Scheduler scheduler) {
        logger.info("Bot wiring started for parties {}", transactionFilter.getParties());
        TransactionsClient transactionsClient = ledgerClient.getTransactionsClient();
        LedgerViewFlowable.ledgerViewAndOffsetFromACS(Flowable.empty(), function2).observeOn(scheduler).flatMap(pair -> {
            LedgerViewFlowable.LedgerView ledgerView = (LedgerViewFlowable.LedgerView) pair.getFirst();
            LedgerOffset ledgerOffset = (LedgerOffset) pair.getSecond();
            logger.debug("LedgerView accumulated from acs completed. Offset: {} LedgerView: {}", ledgerOffset, ledgerOffset);
            return ledgerClient.getTransactionsClient().getLedgerEnd().flatMap(ledgerOffset2 -> {
                logger.debug("LedgerEnd: {}", ledgerOffset2);
                return LedgerViewFlowable.ledgerViewFromFlowable(ledgerView, FlowableLogger.log(transactionsClient.getTransactions(ledgerOffset, ledgerOffset2, transactionFilter, true), "initTransactions").map(transaction -> {
                    return transaction;
                }), function2).map(ledgerView2 -> {
                    return new Pair(ledgerView2, ledgerOffset2);
                });
            });
        }).doOnSuccess(pair2 -> {
            LedgerViewFlowable.LedgerView ledgerView = (LedgerViewFlowable.LedgerView) pair2.getFirst();
            LedgerOffset ledgerOffset = (LedgerOffset) pair2.getSecond();
            logger.debug("LedgerView accumulated from acs and transactions completed. Offset: {} LedgerView: {}", ledgerOffset, ledgerView);
            Flowable observeOn = FlowableLogger.log(transactionsClient.getTransactions(ledgerOffset, transactionFilter, true), "transactions").observeOn(scheduler);
            Flowable observeOn2 = FlowableLogger.log(failuresCommandIds(transactionFilter.getParties(), ledgerClient.getCommandCompletionClient().completionStream(str, (LedgerOffset) LedgerOffset.LedgerEnd.getInstance(), transactionFilter.getParties())), "completionFailures").observeOn(scheduler);
            ReplaySubject create = ReplaySubject.create();
            ReplaySubject create2 = ReplaySubject.create();
            Flowable share = LedgerViewFlowable.of(ledgerView, FlowableLogger.log(create.toFlowable(BackpressureStrategy.BUFFER), "submissionsFailures").observeOn(scheduler), observeOn2, observeOn.map(transaction -> {
                return transaction;
            }), FlowableLogger.log(create2.toFlowable(BackpressureStrategy.BUFFER), "commandsAndPendingSet").observeOn(scheduler), function2).concatMap(ledgerView2 -> {
                Flowable error;
                try {
                    error = Flowable.concat((Flowable) function.apply(ledgerView2), Flowable.just(CommandsAndPendingSet.empty));
                } catch (Throwable th) {
                    logger.error("Error during execution of bot.", th);
                    error = Flowable.error(th);
                }
                return FlowableLogger.log(error, "bot.execution");
            }).share();
            share.filter(commandsAndPendingSet -> {
                return !commandsAndPendingSet.getSubmitCommandsRequest().getCommandId().isEmpty();
            }).map((v0) -> {
                return v0.getSubmitCommandsRequest();
            }).concatMapMaybe(commandsFailuresFromSubmissions(ledgerClient.getCommandSubmissionClient())).toObservable().subscribe(create);
            share.toObservable().subscribe(create2);
            logger.info("Bot wiring complete for parties {}", transactionFilter.getParties());
        }).toFlowable().observeOn(scheduler).publish().connect();
    }

    public static void wireSimple(String str, LedgerClient ledgerClient, TransactionFilter transactionFilter, Function<LedgerViewFlowable.LedgerView<CreatedContract>, Flowable<CommandsAndPendingSet>> function) {
        wire(str, ledgerClient, transactionFilter, function, createdContract -> {
            return createdContract;
        });
    }

    static Flowable<WorkflowEvent> activeContractSetAndNewTransactions(LedgerClient ledgerClient, TransactionFilter transactionFilter) {
        CompletableFuture completableFuture = new CompletableFuture();
        BiConsumer biConsumer = (ledgerOffset, str) -> {
            if (completableFuture.isDone()) {
                return;
            }
            logger.debug(str + ".completeOffsetFuture: " + ledgerOffset);
            completableFuture.complete(ledgerOffset);
        };
        return Flowable.concat(FlowableLogger.log(ledgerClient.getActiveContractSetClient().getActiveContracts(transactionFilter, true), "acs").doOnNext(getActiveContractsResponse -> {
            getActiveContractsResponse.getOffset().ifPresent(str2 -> {
                biConsumer.accept(new LedgerOffset.Absolute(str2), "acs.next");
            });
        }).doOnComplete(() -> {
            biConsumer.accept(LedgerOffset.LedgerBegin.getInstance(), "acs.complete");
        }), Single.fromFuture(completableFuture).doOnSuccess(ledgerOffset2 -> {
            logger.debug("offset.success: " + ledgerOffset2);
        }).doOnError(th -> {
            logger.error("offset.error: " + th);
        }).flatMapPublisher(ledgerOffset3 -> {
            return FlowableLogger.log(ledgerClient.getTransactionsClient().getTransactions(ledgerOffset3, transactionFilter, true), "transactions");
        }));
    }

    private static io.reactivex.functions.Function<SubmitCommandsRequest, MaybeSource<? extends LedgerViewFlowable.SubmissionFailure>> commandsFailuresFromSubmissions(CommandSubmissionClient commandSubmissionClient) {
        return submitCommandsRequest -> {
            logger.debug("Submitting: {}", submitCommandsRequest);
            return FlowableLogger.log(commandSubmissionClient.submit(submitCommandsRequest.getWorkflowId(), submitCommandsRequest.getApplicationId(), submitCommandsRequest.getCommandId(), submitCommandsRequest.getParty(), submitCommandsRequest.getMinLedgerTimeAbsolute(), submitCommandsRequest.getMinLedgerTimeRelative(), submitCommandsRequest.getDeduplicationTime(), submitCommandsRequest.getCommands()).flatMapMaybe(empty -> {
                return Maybe.empty();
            }).doOnError(th -> {
                logger.error("Error submitting commands {} for party {}: {}", new Object[]{submitCommandsRequest.getCommandId(), submitCommandsRequest.getParty(), th.getMessage()});
            }).onErrorReturn(th2 -> {
                return new LedgerViewFlowable.SubmissionFailure(submitCommandsRequest.getCommandId(), th2);
            }), "commandSubmissions");
        };
    }
}
