package com.jaffa.rpc.lib.kafka.receivers;

import com.jaffa.rpc.lib.JaffaService;
import java.io.Closeable;
import java.lang.Thread;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jaffa/rpc/lib/kafka/receivers/KafkaReceiver.class */
public abstract class KafkaReceiver implements Closeable, Runnable {
    private static final Logger log = LoggerFactory.getLogger(KafkaReceiver.class);
    private final ArrayList<Thread> threads = new ArrayList<>(JaffaService.getBrokersCount());

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startThreadsAndWait(Runnable runnable) {
        for (int i = 0; i < JaffaService.getBrokersCount(); i++) {
            this.threads.add(new Thread(runnable));
        }
        this.threads.forEach((v0) -> {
            v0.start();
        });
        this.threads.forEach(thread -> {
            try {
                thread.join();
            } catch (InterruptedException e) {
                log.error(MessageFormat.format("Can not join thread {0} in {1}", thread.getName(), getClass().getSimpleName()), e);
            }
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        Iterator<Thread> it = this.threads.iterator();
        while (it.hasNext()) {
            Thread next = it.next();
            do {
                next.interrupt();
            } while (next.getState() != Thread.State.TERMINATED);
            log.info("Thread {} from {} terminated", next.getName(), getClass().getSimpleName());
        }
        log.info("{} terminated", getClass().getSimpleName());
    }
}
