/*
 * Decompiled with CFR 0.152.
 */
package io.axual.client.proxy.switching.consumer;

import io.axual.client.proxy.generic.client.ClientProxyFactory;
import io.axual.client.proxy.generic.consumer.ConsumerProxy;
import io.axual.client.proxy.switching.consumer.Assignment;
import io.axual.client.proxy.switching.consumer.Subscription;
import io.axual.client.proxy.switching.consumer.SwitchingConsumerConfig;
import io.axual.client.proxy.switching.generic.BaseClientProxySwitcher;
import io.axual.discovery.client.DiscoveryResult;
import java.time.Duration;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerSwitcher<K, V>
extends BaseClientProxySwitcher<ConsumerProxy<K, V>, SwitchingConsumerConfig<K, V>> {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerSwitcher.class);
    private Assignment<K, V> assignment = null;
    private Subscription<K, V> subscription = null;

    public Assignment<K, V> getAssignment() {
        return this.assignment;
    }

    public void setAssignment(Consumer<K, V> consumer, SwitchingConsumerConfig config, Assignment<K, V> assignment) {
        this.updateAssignment(consumer, config, assignment, false);
    }

    public Subscription<K, V> getSubscription() {
        return this.subscription;
    }

    public void setSubscription(Consumer<K, V> consumer, SwitchingConsumerConfig config, Subscription<K, V> subscription) {
        this.updateSubscription(consumer, config, subscription, false);
    }

    public void unsubscribe(Consumer<K, V> consumer, SwitchingConsumerConfig config) {
        this.updateAssignment(consumer, config, null, false);
        this.updateSubscription(consumer, config, null, false);
    }

    @Override
    public ConsumerProxy<K, V> switchProxy(ConsumerProxy<K, V> oldProxy, SwitchingConsumerConfig<K, V> config, DiscoveryResult oldResult, DiscoveryResult newResult) {
        ConsumerProxy<K, V> result = super.switchProxy(oldProxy, config, oldResult, newResult);
        LOG.info("Consumer switched, applying assignments and subscriptions");
        this.updateAssignment(result, config, this.assignment, true);
        this.updateSubscription(result, config, this.subscription, true);
        LOG.info("Consumer switch finished");
        return result;
    }

    @Override
    protected ConsumerProxy<K, V> createProxyObject(SwitchingConsumerConfig config, DiscoveryResult discoveryResult) {
        Map properties = config.getDownstreamConfigs();
        properties.putAll(discoveryResult.getConfigs());
        LOG.info("Creating a new {} with properties: {}", (Object)config.getProxyType(), (Object)properties);
        ClientProxyFactory factory = config.getBackingFactory();
        return (ConsumerProxy)factory.create(properties);
    }

    @Override
    protected Duration getSwitchTimeout(SwitchingConsumerConfig config, DiscoveryResult oldResult, DiscoveryResult newResult) {
        if (this.subscription == null && this.assignment == null) {
            return Duration.ZERO;
        }
        if (config.isAtLeastOnce()) {
            return Duration.ofMillis(newResult.getTtl());
        }
        return Duration.ofMillis(Math.max(super.getSwitchTimeout(config, oldResult, newResult).toMillis(), newResult.getTtl()));
    }

    private void updateAssignment(Consumer<K, V> consumer, SwitchingConsumerConfig config, Assignment<K, V> newAssignment, boolean isSwitching) {
        if (newAssignment != null) {
            boolean seekToEnd = isSwitching && config.isIstAtMostOnceOnSwitch();
            newAssignment.assign(consumer, seekToEnd);
        } else if (this.assignment != null) {
            consumer.unsubscribe();
        }
        this.assignment = newAssignment;
    }

    private void updateSubscription(Consumer<K, V> consumer, SwitchingConsumerConfig config, Subscription<K, V> newSubscription, boolean isSwitching) {
        if (newSubscription != null) {
            if (this.subscription != null && this.subscription.getClass() != newSubscription.getClass()) {
                consumer.unsubscribe();
            }
            boolean seekToEnd = isSwitching && config.isIstAtMostOnceOnSwitch();
            newSubscription.subscribe(consumer, seekToEnd);
        } else if (this.subscription != null) {
            consumer.unsubscribe();
        }
        this.subscription = newSubscription;
    }
}

