package io.wcm.caravan.pipeline.impl.operators;

import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.wcm.caravan.commons.metrics.rx.HitsAndMissesCountingMetricsOperator;
import io.wcm.caravan.commons.metrics.rx.TimerMetricsOperator;
import io.wcm.caravan.io.http.request.CaravanHttpRequest;
import io.wcm.caravan.pipeline.JsonPipelineInputException;
import io.wcm.caravan.pipeline.JsonPipelineOutput;
import io.wcm.caravan.pipeline.cache.CacheDateUtils;
import io.wcm.caravan.pipeline.cache.CachePersistencyOptions;
import io.wcm.caravan.pipeline.cache.CacheStrategy;
import io.wcm.caravan.pipeline.cache.spi.CacheAdapter;
import io.wcm.caravan.pipeline.impl.JacksonFunctions;
import io.wcm.caravan.pipeline.impl.JsonPipelineContextImpl;
import io.wcm.caravan.pipeline.impl.JsonPipelineOutputImpl;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.exceptions.Exceptions;

/* loaded from: input_file:io/wcm/caravan/pipeline/impl/operators/CachePointTransformer.class */
public class CachePointTransformer implements Observable.Transformer<JsonPipelineOutput, JsonPipelineOutput> {
    private static final Logger log = LoggerFactory.getLogger(CachePointTransformer.class);
    private JsonPipelineContextImpl context;
    private final List<CaravanHttpRequest> requests;
    private final String descriptor;
    private final CacheStrategy strategy;
    private final String correlationId;

    /* loaded from: input_file:io/wcm/caravan/pipeline/impl/operators/CachePointTransformer$CacheEnvelope.class */
    public static final class CacheEnvelope {
        private static final String CACHE_METADATA_PROPERTY = "metadata";
        private static final String CACHE_CONTENT_PROPERTY = "content";
        private final ObjectNode envelopeNode;
        private final ObjectNode metadataNode;
        private final JsonNode contentNode;

        private CacheEnvelope(ObjectNode objectNode) {
            this.envelopeNode = objectNode;
            this.metadataNode = objectNode.get(CACHE_METADATA_PROPERTY);
            this.contentNode = objectNode.get(CACHE_CONTENT_PROPERTY);
        }

        public static CacheEnvelope fromEnvelopeString(String str, String str2) {
            try {
                ObjectNode stringToObjectNode = JacksonFunctions.stringToObjectNode(str);
                if (stringToObjectNode.has(CACHE_METADATA_PROPERTY) && stringToObjectNode.has(CACHE_CONTENT_PROPERTY)) {
                    return new CacheEnvelope(stringToObjectNode);
                }
                CachePointTransformer.log.warn("Ignoring cached document {}, because it doesn't have the expected metadata/content envelope.", str2);
                return null;
            } catch (JsonPipelineInputException e) {
                CachePointTransformer.log.warn("Failed parse cached JSON document from " + str2, e);
                return null;
            }
        }

        public static CacheEnvelope from200Response(JsonNode jsonNode, int i, List<CaravanHttpRequest> list, String str, String str2, Map<String, String> map) {
            return new CacheEnvelope(createEnvelopeNode(jsonNode, 200, i, list, str, str2, null, map));
        }

        public static CacheEnvelope from404Response(String str, int i, List<CaravanHttpRequest> list, String str2, String str3, Map<String, String> map) {
            return new CacheEnvelope(createEnvelopeNode(JacksonFunctions.emptyObject(), 404, i, list, str2, str3, str, map));
        }

        static CacheEnvelope fromContentString(String str, int i) {
            CacheEnvelope cacheEnvelope = new CacheEnvelope(createEnvelopeNode(JacksonFunctions.stringToObjectNode(str), 200, 0, ImmutableList.of(), "Cache-Key", "Descriptor", null, ImmutableMap.of()));
            cacheEnvelope.getMetadataNode().put("generated", CacheDateUtils.formatRelativeTime(-i));
            return cacheEnvelope;
        }

        private static ObjectNode createEnvelopeNode(JsonNode jsonNode, int i, int i2, List<CaravanHttpRequest> list, String str, String str2, String str3, Map<String, String> map) {
            ObjectNode emptyObject = JacksonFunctions.emptyObject();
            ObjectNode putObject = emptyObject.putObject(CACHE_METADATA_PROPERTY);
            putObject.put("cacheKey", str);
            putObject.set("sources", JacksonFunctions.pojoToNode(CachePointTransformer.getSourceServiceIds(list)));
            putObject.put("pipeline", str2);
            putObject.put("generated", CacheDateUtils.formatCurrentTime());
            if (i2 > 0) {
                putObject.put("expires", CacheDateUtils.formatRelativeTime(i2));
            }
            putObject.put("statusCode", i);
            ArrayList arrayList = new ArrayList();
            Iterator<CaravanHttpRequest> it = list.iterator();
            while (it.hasNext()) {
                arrayList.add(StringUtils.substringBefore(it.next().getUrl(), "?"));
            }
            putObject.set("sourcePaths", JacksonFunctions.pojoToNode(arrayList));
            if (StringUtils.isNotBlank(str3)) {
                putObject.put("reason", str3);
            }
            putObject.set("contextProperties", JacksonFunctions.pojoToNode(map));
            emptyObject.set(CACHE_CONTENT_PROPERTY, jsonNode);
            return emptyObject;
        }

        public String getEnvelopeString() {
            return JacksonFunctions.nodeToString(this.envelopeNode);
        }

        JsonNode getContentNode() {
            return this.contentNode;
        }

        ObjectNode getMetadataNode() {
            return this.metadataNode;
        }

        int getStatusCode() {
            return this.metadataNode.at("/statusCode").asInt(200);
        }

        String getReasonString() {
            return this.metadataNode.at("/reason").asText("Not Found");
        }

        String getSources() {
            return this.metadataNode.at("/sources").toString();
        }

        int getResponseAge() {
            return CacheDateUtils.getSecondsSince(getGeneratedDate());
        }

        String getGeneratedDate() {
            return this.metadataNode.at("/generated").asText();
        }

        int getExpirySeconds() {
            return !this.metadataNode.has("expires") ? (int) TimeUnit.DAYS.toSeconds(365L) : CacheDateUtils.getSecondsUntil(this.metadataNode.at("/expires").asText());
        }

        public void setGeneratedDate(String str) {
            this.metadataNode.put("generated", str);
        }

        public void setExpiresDate(String str) {
            this.metadataNode.put("expires", str);
        }
    }

    /* loaded from: input_file:io/wcm/caravan/pipeline/impl/operators/CachePointTransformer$CacheResponseObserver.class */
    public final class CacheResponseObserver implements Observer<String> {
        private final String cacheKey;
        private final Observable<JsonPipelineOutput> originalSource;
        private final Subscriber<? super JsonPipelineOutput> subscriber;
        private boolean cacheHit;

        private CacheResponseObserver(String str, Observable<JsonPipelineOutput> observable, Subscriber<? super JsonPipelineOutput> subscriber) {
            this.cacheKey = str;
            this.originalSource = observable;
            this.subscriber = subscriber;
        }

        public void onNext(String str) {
            final CacheEnvelope fromEnvelopeString = CacheEnvelope.fromEnvelopeString(str, this.cacheKey);
            if (fromEnvelopeString == null) {
                CachePointTransformer.log.warn("CACHE ERROR for {} - the cached response could not be parsed,\n{}", this.cacheKey, CachePointTransformer.this.correlationId);
                return;
            }
            this.cacheHit = true;
            int responseAge = fromEnvelopeString.getResponseAge();
            int refreshInterval = CachePointTransformer.this.strategy.getCachePersistencyOptions(CachePointTransformer.this.requests).getRefreshInterval();
            int expirySeconds = fromEnvelopeString.getExpirySeconds();
            int clientMaxAge = getClientMaxAge();
            if (responseAge >= refreshInterval || responseAge >= clientMaxAge || expirySeconds <= 0) {
                CachePointTransformer.log.debug("CACHE STALE - content for {} is available, but {},\n{}", new Object[]{this.cacheKey, responseAge >= refreshInterval ? "it's " + responseAge + " seconds old and the cache strategy has a refresh interval of " + refreshInterval + " seconds." : responseAge >= clientMaxAge ? "it's " + responseAge + " seconds old and the client requested a max-age of " + clientMaxAge + " seconds." : "it has expired " + (-expirySeconds) + " seconds ago, according to the original max-age header from the http-response", CachePointTransformer.this.correlationId});
                fetchAndStore(new Subscriber<JsonPipelineOutput>() { // from class: io.wcm.caravan.pipeline.impl.operators.CachePointTransformer.CacheResponseObserver.1
                    public void onNext(JsonPipelineOutput jsonPipelineOutput) {
                        CacheResponseObserver.this.subscriber.onNext(jsonPipelineOutput);
                    }

                    public void onCompleted() {
                        CacheResponseObserver.this.subscriber.onCompleted();
                    }

                    public void onError(Throwable th) {
                        Exceptions.throwIfFatal(th);
                        if (fromEnvelopeString.getStatusCode() >= 400) {
                            CacheResponseObserver.this.subscriber.onError(th);
                            return;
                        }
                        if ((th instanceof JsonPipelineInputException) && ((JsonPipelineInputException) th).getStatusCode() == 404) {
                            CachePointTransformer.log.warn("CACHE FALLBACK - Using stale content from cache as a fallback after failing to fresh content for " + CacheResponseObserver.this.cacheKey + ",\n" + CachePointTransformer.this.correlationId + "\n" + th.getMessage());
                        } else {
                            CachePointTransformer.log.warn("CACHE FALLBACK - Using stale content from cache as a fallback after failing to fresh content for " + CacheResponseObserver.this.cacheKey + ",\n" + CachePointTransformer.this.correlationId, th);
                        }
                        CacheResponseObserver.this.subscriber.onNext(new JsonPipelineOutputImpl(fromEnvelopeString.getContentNode(), CachePointTransformer.this.requests).withMaxAge(0));
                        CacheResponseObserver.this.subscriber.onCompleted();
                    }
                });
            } else {
                CachePointTransformer.log.debug("CACHE HIT for {},\n{}", this.cacheKey, CachePointTransformer.this.correlationId);
                serveCachedContent(fromEnvelopeString, refreshInterval);
            }
        }

        private int getClientMaxAge() {
            int i;
            int seconds = (int) TimeUnit.DAYS.toSeconds(365L);
            for (String str : ((CaravanHttpRequest) CachePointTransformer.this.requests.get(0)).getHeaders().get("Cache-Control")) {
                if (str.startsWith("max-age") && (i = NumberUtils.toInt(StringUtils.substringAfter(str, "="), seconds)) > 0) {
                    seconds = i;
                }
            }
            return seconds;
        }

        private void serveCachedContent(CacheEnvelope cacheEnvelope, int i) {
            if (cacheEnvelope.getStatusCode() == 404) {
                this.subscriber.onError(new JsonPipelineInputException(404, cacheEnvelope.getReasonString() + (" (Cached from " + cacheEnvelope.getSources() + " at " + cacheEnvelope.getGeneratedDate() + ")")));
            } else {
                this.subscriber.onNext(new JsonPipelineOutputImpl(cacheEnvelope.getContentNode(), CachePointTransformer.this.requests).withMaxAge(Math.min(i - cacheEnvelope.getResponseAge(), cacheEnvelope.getExpirySeconds())));
                this.subscriber.onCompleted();
            }
        }

        public void onCompleted() {
            if (this.cacheHit) {
                return;
            }
            CachePointTransformer.log.debug("CACHE MISS for {} fetching response from {} through pipeline,\n{}", new Object[]{this.cacheKey, CachePointTransformer.this.getSourceServicePrefix(), CachePointTransformer.this.correlationId});
            fetchAndStore(this.subscriber);
        }

        public void onError(Throwable th) {
            Exceptions.throwIfFatal(th);
            CachePointTransformer.log.warn("Failed to connect to couchbase server, falling back to direct connection to " + CachePointTransformer.this.getSourceServicePrefix() + ",\n" + CachePointTransformer.this.correlationId, th);
            fetchAndStore(this.subscriber);
        }

        private void fetchAndStore(final Subscriber<? super JsonPipelineOutput> subscriber) {
            this.originalSource.subscribe(new Observer<JsonPipelineOutput>() { // from class: io.wcm.caravan.pipeline.impl.operators.CachePointTransformer.CacheResponseObserver.2
                public void onNext(JsonPipelineOutput jsonPipelineOutput) {
                    CachePersistencyOptions cachePersistencyOptions = CachePointTransformer.this.strategy.getCachePersistencyOptions(CachePointTransformer.this.requests);
                    int refreshInterval = cachePersistencyOptions.getRefreshInterval();
                    if (jsonPipelineOutput.getMaxAge() >= 0) {
                        refreshInterval = Math.min(refreshInterval, jsonPipelineOutput.getMaxAge());
                    }
                    CachePointTransformer.log.debug("CACHE PUT - response for {} has been fetched and will be put in the cache, max-age={} sec,\n{}", new Object[]{CacheResponseObserver.this.cacheKey, Integer.valueOf(refreshInterval), CachePointTransformer.this.correlationId});
                    CachePointTransformer.this.context.getCacheAdapter().put(CacheResponseObserver.this.cacheKey, CacheEnvelope.from200Response(jsonPipelineOutput.getPayload(), refreshInterval, CachePointTransformer.this.requests, CacheResponseObserver.this.cacheKey, CachePointTransformer.this.descriptor, CachePointTransformer.this.context.getProperties()).getEnvelopeString(), cachePersistencyOptions);
                    subscriber.onNext(jsonPipelineOutput.withMaxAge(refreshInterval));
                }

                public void onCompleted() {
                    subscriber.onCompleted();
                }

                public void onError(Throwable th) {
                    Exceptions.throwIfFatal(th);
                    if ((th instanceof JsonPipelineInputException) && ((JsonPipelineInputException) th).getStatusCode() == 404) {
                        CachePersistencyOptions createTransient = CachePersistencyOptions.createTransient(60);
                        CachePointTransformer.log.debug("CACHE PUT - 404 response for {} will be stored in the cache, max-age={} sec,\n{}", new Object[]{CachePointTransformer.this.descriptor, Integer.valueOf(createTransient.getRefreshInterval()), CachePointTransformer.this.correlationId});
                        CachePointTransformer.this.context.getCacheAdapter().put(CacheResponseObserver.this.cacheKey, CacheEnvelope.from404Response(th.getMessage(), 60, CachePointTransformer.this.requests, CacheResponseObserver.this.cacheKey, CachePointTransformer.this.descriptor, CachePointTransformer.this.context.getProperties()).getEnvelopeString(), createTransient);
                    }
                    subscriber.onError(th);
                }
            });
        }
    }

    public CachePointTransformer(JsonPipelineContextImpl jsonPipelineContextImpl, List<CaravanHttpRequest> list, String str, CacheStrategy cacheStrategy) {
        this.context = jsonPipelineContextImpl;
        this.requests = list;
        this.descriptor = str;
        this.strategy = cacheStrategy;
        StringBuffer stringBuffer = new StringBuffer();
        for (CaravanHttpRequest caravanHttpRequest : list) {
            if (stringBuffer.length() == 0) {
                stringBuffer.append(caravanHttpRequest.getCorrelationId());
            } else {
                stringBuffer.append(",").append(caravanHttpRequest.getCorrelationId());
            }
        }
        this.correlationId = stringBuffer.toString();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static SortedSet<String> getSourceServiceIds(List<CaravanHttpRequest> list) {
        TreeSet treeSet = new TreeSet();
        Iterator<CaravanHttpRequest> it = list.iterator();
        while (it.hasNext()) {
            treeSet.add(it.next().getServiceId());
        }
        return treeSet;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getSourceServicePrefix() {
        return StringUtils.join(getSourceServiceIds(this.requests), '+');
    }

    public Observable<JsonPipelineOutput> call(Observable<JsonPipelineOutput> observable) {
        return Observable.create(subscriber -> {
            String sourceServicePrefix = getSourceServicePrefix();
            CacheAdapter cacheAdapter = this.context.getCacheAdapter();
            String str = sourceServicePrefix + ":" + this.descriptor;
            Observable observable2 = cacheAdapter.get(str, this.strategy.getCachePersistencyOptions(this.requests));
            MetricRegistry metricRegistry = this.context.getMetricRegistry();
            observable2.lift(new TimerMetricsOperator(metricRegistry.timer(MetricRegistry.name(getClass(), new String[]{sourceServicePrefix, "latency", "get"})))).lift(new HitsAndMissesCountingMetricsOperator(metricRegistry.counter(MetricRegistry.name(getClass(), new String[]{sourceServicePrefix, "hits"})), metricRegistry.counter(MetricRegistry.name(getClass(), new String[]{sourceServicePrefix, "misses"})))).subscribe(new CacheResponseObserver(str, observable, subscriber));
        });
    }
}
