package io.knotx.databridge.core.impl;

import io.knotx.databridge.core.DataBridgeKnotOptions;
import io.knotx.databridge.core.datasource.DataSourceEntry;
import io.knotx.databridge.core.datasource.DataSourcesEngine;
import io.knotx.dataobjects.Fragment;
import io.knotx.dataobjects.KnotContext;
import io.knotx.exceptions.FragmentProcessingException;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.reactivex.core.Vertx;
import java.util.concurrent.ExecutionException;

/* loaded from: input_file:io/knotx/databridge/core/impl/FragmentProcessor.class */
public class FragmentProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(FragmentProcessor.class);
    private final DataSourcesEngine serviceEngine;

    public FragmentProcessor(Vertx vertx, DataBridgeKnotOptions dataBridgeKnotOptions) {
        this.serviceEngine = new DataSourcesEngine(vertx, dataBridgeKnotOptions);
    }

    public Single<FragmentContext> processSnippet(FragmentContext fragmentContext, KnotContext knotContext) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("Processing Handlebars snippet {}", new Object[]{fragmentContext.fragment()});
        }
        Observable flatMap = Observable.just(fragmentContext).flatMap((v0) -> {
            return v0.services();
        });
        DataSourcesEngine dataSourcesEngine = this.serviceEngine;
        dataSourcesEngine.getClass();
        return flatMap.map(dataSourcesEngine::mergeWithConfiguration).doOnNext(this::traceService).flatMap(dataSourceEntry -> {
            Observable observable = fetchServiceData(dataSourceEntry, knotContext).toObservable();
            dataSourceEntry.getClass();
            return observable.map(dataSourceEntry::getResultWithNamespaceAsKey).doOnError(th -> {
                storeErrorInFragment(fragmentContext.fragment(), th, dataSourceEntry.getName());
            });
        }).reduce(new JsonObject(), (v0, v1) -> {
            return v0.mergeIn(v1);
        }).map(jsonObject -> {
            return applyData(fragmentContext, jsonObject);
        }).onErrorReturn(th -> {
            return handleError(fragmentContext, knotContext, th);
        });
    }

    private Single<JsonObject> fetchServiceData(DataSourceEntry dataSourceEntry, KnotContext knotContext) {
        LOGGER.debug("Fetching data from service {} {}", new Object[]{dataSourceEntry.getAddress(), dataSourceEntry.getParams()});
        try {
            return (Single) knotContext.getCache().get(dataSourceEntry.getCacheKey(), () -> {
                LOGGER.debug("Requesting data from adapter {} with params {}", new Object[]{dataSourceEntry.getAddress(), dataSourceEntry.getParams()});
                return this.serviceEngine.doServiceCall(dataSourceEntry, knotContext).cache();
            });
        } catch (ExecutionException e) {
            LOGGER.fatal("Unable to get service data {}", e);
            return Single.error(e);
        }
    }

    private FragmentContext applyData(FragmentContext fragmentContext, JsonObject jsonObject) {
        LOGGER.trace("Applying data to snippet {}", new Object[]{fragmentContext});
        fragmentContext.fragment().context().mergeIn(jsonObject);
        return fragmentContext;
    }

    private void traceService(DataSourceEntry dataSourceEntry) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Found service call definition: {} {}", new Object[]{dataSourceEntry.getAddress(), dataSourceEntry.getParams()});
        }
    }

    private void storeErrorInFragment(Fragment fragment, Throwable th, String str) {
        LOGGER.error("Data Bridge service {} failed. Cause: {}", new Object[]{str, th.getMessage()});
        fragment.failure("databridge", th);
    }

    private FragmentContext handleError(FragmentContext fragmentContext, KnotContext knotContext, Throwable th) {
        LOGGER.error("Fragment processing failed. Cause:{}\nRequest:\n{}\nFragmentContext:\n{}\n", new Object[]{th.getMessage(), knotContext.getClientRequest(), fragmentContext});
        fragmentContext.fragment().failure("databridge", th);
        if (fragmentContext.fragment().fallback().isPresent()) {
            return fragmentContext;
        }
        throw new FragmentProcessingException(String.format("Fragment processing failed in %s", "databridge"), th);
    }
}
