/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.distributed;

import java.net.InetSocketAddress;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator;
import org.apache.kafka.connect.runtime.distributed.WorkerRebalanceListener;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkerGroupMember {
    private static final Logger log = LoggerFactory.getLogger(WorkerGroupMember.class);
    private static final AtomicInteger CONNECT_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private static final String JMX_PREFIX = "kafka.connect";
    private final Time time;
    private final String clientId;
    private final ConsumerNetworkClient client;
    private final Metrics metrics;
    private final Metadata metadata;
    private final long retryBackoffMs;
    private final WorkerCoordinator coordinator;
    private boolean stopped = false;

    public WorkerGroupMember(DistributedConfig config, String restUrl, ConfigBackingStore configStorage, WorkerRebalanceListener listener, Time time) {
        try {
            this.time = time;
            String clientIdConfig = config.getString("client.id");
            this.clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
            LinkedHashMap<String, String> metricsTags = new LinkedHashMap<String, String>();
            metricsTags.put("client-id", this.clientId);
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS).tags(metricsTags);
            List<MetricsReporter> reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class);
            reporters.add(new JmxReporter(JMX_PREFIX));
            this.metrics = new Metrics(metricConfig, reporters, time);
            this.retryBackoffMs = config.getLong("retry.backoff.ms");
            this.metadata = new Metadata(this.retryBackoffMs, config.getLong("metadata.max.age.ms"));
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"));
            this.metadata.update(Cluster.bootstrap(addresses), 0L);
            String metricGrpPrefix = "connect";
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
            NetworkClient netClient = new NetworkClient((Selectable)new Selector(config.getLong("connections.max.idle.ms"), this.metrics, time, metricGrpPrefix, channelBuilder), this.metadata, this.clientId, 100, (long)config.getLong("reconnect.backoff.ms"), (int)config.getInt("send.buffer.bytes"), (int)config.getInt("receive.buffer.bytes"), (int)config.getInt("request.timeout.ms"), time);
            this.client = new ConsumerNetworkClient(netClient, this.metadata, time, this.retryBackoffMs, config.getInt("request.timeout.ms").intValue());
            this.coordinator = new WorkerCoordinator(this.client, config.getString("group.id"), config.getInt("rebalance.timeout.ms"), config.getInt("session.timeout.ms"), config.getInt("heartbeat.interval.ms"), this.metrics, metricGrpPrefix, this.time, this.retryBackoffMs, restUrl, configStorage, listener);
            AppInfoParser.registerAppInfo(JMX_PREFIX, this.clientId);
            log.debug("Connect group member created");
        }
        catch (Throwable t) {
            this.stop(true);
            throw new KafkaException("Failed to construct kafka consumer", t);
        }
    }

    public void stop() {
        if (this.stopped) {
            return;
        }
        this.stop(false);
    }

    public void ensureActive() {
        this.coordinator.poll(0L);
    }

    public void poll(long timeout) {
        if (timeout < 0L) {
            throw new IllegalArgumentException("Timeout must not be negative");
        }
        this.coordinator.poll(timeout);
    }

    public void wakeup() {
        this.client.wakeup();
    }

    public String memberId() {
        return this.coordinator.memberId();
    }

    public void requestRejoin() {
        this.coordinator.requestRejoin();
    }

    public void maybeLeaveGroup() {
        this.coordinator.maybeLeaveGroup();
    }

    public String ownerUrl(String connector) {
        return this.coordinator.ownerUrl(connector);
    }

    public String ownerUrl(ConnectorTaskId task) {
        return this.coordinator.ownerUrl(task);
    }

    private void stop(boolean swallowException) {
        log.trace("Stopping the Connect group member.");
        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
        this.stopped = true;
        ClientUtils.closeQuietly(this.coordinator, "coordinator", firstException);
        ClientUtils.closeQuietly(this.metrics, "consumer metrics", firstException);
        ClientUtils.closeQuietly(this.client, "consumer network client", firstException);
        AppInfoParser.unregisterAppInfo(JMX_PREFIX, this.clientId);
        if (firstException.get() != null && !swallowException) {
            throw new KafkaException("Failed to stop the Connect group member", firstException.get());
        }
        log.debug("The Connect group member has stopped.");
    }
}

