/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.http.utils;

import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.ConsumableEvent;
import io.servicetalk.client.api.LoadBalancer;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.DelegatingFilterableStreamingHttpLoadBalancedConnection;
import io.servicetalk.http.api.FilterableStreamingHttpLoadBalancedConnection;
import io.servicetalk.http.api.HttpEventKey;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpLoadBalancerFactory;
import io.servicetalk.http.utils.CacheConnectionFactory;
import io.servicetalk.transport.api.TransportObserver;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.ToIntFunction;
import javax.annotation.Nullable;

public final class CacheConnectionHttpLoadBalanceFactory<ResolvedAddress>
implements HttpLoadBalancerFactory<ResolvedAddress> {
    private final ToIntFunction<ResolvedAddress> maxConcurrencyFunc;
    private final HttpLoadBalancerFactory<ResolvedAddress> delegate;

    public CacheConnectionHttpLoadBalanceFactory(HttpLoadBalancerFactory<ResolvedAddress> delegate, ToIntFunction<ResolvedAddress> maxConcurrencyFunc) {
        this.maxConcurrencyFunc = Objects.requireNonNull(maxConcurrencyFunc);
        this.delegate = Objects.requireNonNull(delegate);
    }

    public <T extends FilterableStreamingHttpLoadBalancedConnection> LoadBalancer<T> newLoadBalancer(String targetResource, Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher, ConnectionFactory<ResolvedAddress, T> connectionFactory) {
        throw new UnsupportedOperationException("Use newLoadBalancerTyped instead.");
    }

    public LoadBalancer<FilterableStreamingHttpLoadBalancedConnection> newLoadBalancer(Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher, ConnectionFactory<ResolvedAddress, FilterableStreamingHttpLoadBalancedConnection> connectionFactory, String targetResource) {
        HttpCacheConnectionFactory cacheFactory = new HttpCacheConnectionFactory(connectionFactory, this.maxConcurrencyFunc);
        return this.delegate.newLoadBalancer(eventPublisher, new CacheConnectionFactory(cacheFactory, r -> {
            ConcurrencyRefCnt v = (ConcurrencyRefCnt)cacheFactory.maxConcurrentMap.get(r);
            return v == null ? this.maxConcurrencyFunc.applyAsInt(r) : v.concurrency;
        }), targetResource);
    }

    public HttpExecutionStrategy requiredOffloads() {
        return this.delegate.requiredOffloads();
    }

    private static final class ConcurrencyRefCnt {
        final int concurrency;
        final int refCnt;

        private ConcurrencyRefCnt(int concurrency, int refCnt) {
            this.concurrency = concurrency;
            this.refCnt = refCnt;
        }
    }

    private static final class MaxConcurrencyFilterableStreamingHttpLoadBalancedConnection<RA>
    extends DelegatingFilterableStreamingHttpLoadBalancedConnection {
        MaxConcurrencyFilterableStreamingHttpLoadBalancedConnection(FilterableStreamingHttpLoadBalancedConnection delegate, final Map<RA, ConcurrencyRefCnt> maxConcurrentMap, final RA resolvedAddress, ToIntFunction<RA> concurrencyEstimator) {
            super(delegate);
            maxConcurrentMap.compute(resolvedAddress, (ra, refCnt) -> refCnt == null ? new ConcurrencyRefCnt(concurrencyEstimator.applyAsInt(ra), 1) : new ConcurrencyRefCnt(refCnt.concurrency, refCnt.refCnt + 1));
            SourceAdapters.toSource((Publisher)delegate.transportEventStream(HttpEventKey.MAX_CONCURRENCY)).subscribe((PublisherSource.Subscriber)new PublisherSource.Subscriber<ConsumableEvent<Integer>>(){

                public void onSubscribe(PublisherSource.Subscription subscription) {
                    subscription.request(Long.MAX_VALUE);
                }

                public void onNext(@Nullable ConsumableEvent<Integer> integerConsumableEvent) {
                    if (integerConsumableEvent != null) {
                        int concurrency = (Integer)integerConsumableEvent.event();
                        assert (concurrency >= 0);
                        if (concurrency > 0) {
                            maxConcurrentMap.computeIfPresent(resolvedAddress, (ra, refCnt) -> new ConcurrencyRefCnt(concurrency, refCnt.refCnt));
                        }
                    }
                }

                public void onError(Throwable t) {
                    this.decrementRefCnt();
                }

                public void onComplete() {
                    this.decrementRefCnt();
                }

                private void decrementRefCnt() {
                    maxConcurrentMap.computeIfPresent(resolvedAddress, (ra, refCnt) -> refCnt.refCnt <= 1 ? null : new ConcurrencyRefCnt(refCnt.concurrency, refCnt.refCnt - 1));
                }
            });
        }
    }

    private static final class HttpCacheConnectionFactory<RA>
    implements ConnectionFactory<RA, FilterableStreamingHttpLoadBalancedConnection> {
        private final Map<RA, ConcurrencyRefCnt> maxConcurrentMap = new ConcurrentHashMap<RA, ConcurrencyRefCnt>();
        private final ToIntFunction<RA> maxConcurrencyFunc;
        private final ConnectionFactory<RA, FilterableStreamingHttpLoadBalancedConnection> delegate;

        private HttpCacheConnectionFactory(ConnectionFactory<RA, FilterableStreamingHttpLoadBalancedConnection> delegate, ToIntFunction<RA> maxConcurrencyFunc) {
            this.delegate = Objects.requireNonNull(delegate);
            this.maxConcurrencyFunc = Objects.requireNonNull(maxConcurrencyFunc);
        }

        public Single<FilterableStreamingHttpLoadBalancedConnection> newConnection(RA resolvedAddress, @Nullable ContextMap context, @Nullable TransportObserver observer) {
            return this.delegate.newConnection(resolvedAddress, context, observer).map(connection -> new MaxConcurrencyFilterableStreamingHttpLoadBalancedConnection<Object>((FilterableStreamingHttpLoadBalancedConnection)connection, (Map<Object, ConcurrencyRefCnt>)this.maxConcurrentMap, resolvedAddress, (ToIntFunction<Object>)this.maxConcurrencyFunc));
        }

        public Completable closeAsync() {
            return this.delegate.closeAsync();
        }

        public Completable closeAsyncGracefully() {
            return this.delegate.closeAsyncGracefully();
        }

        public Completable onClose() {
            return this.delegate.onClose();
        }

        public Completable onClosing() {
            return this.delegate.onClosing();
        }
    }
}

