package io.goodforgod.testcontainers.extensions.kafka;

import io.goodforgod.testcontainers.extensions.kafka.KafkaConnection;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.shaded.org.awaitility.core.ConditionTimeoutException;

/* JADX INFO: Access modifiers changed from: package-private */
@ApiStatus.Internal
/* loaded from: input_file:io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl.class */
public class KafkaConnectionImpl implements KafkaConnection {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConnection.class);
    private volatile KafkaProducer<byte[], byte[]> producer;
    private volatile Admin admin;
    private final ParamsImpl params;

    @Nullable
    private final ParamsImpl paramsInNetwork;
    private volatile boolean isClosed = false;
    private final List<ConsumerImpl> consumers = new CopyOnWriteArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl$ConsumerImpl.class */
    public static final class ConsumerImpl implements KafkaConnection.Consumer {
        private static final Logger logger = LoggerFactory.getLogger(KafkaConnection.Consumer.class);
        private final KafkaConsumer<byte[], byte[]> consumer;
        private final Set<String> topics;
        private final String clientId;
        private final ExecutorService executor = Executors.newSingleThreadExecutor();
        private final List<ReceivedEvent> receivedPreviously = new CopyOnWriteArrayList();
        private final AtomicBoolean isActive = new AtomicBoolean(true);
        private final BlockingQueue<ConsumerRecord<byte[], byte[]>> messageQueue = new LinkedBlockingDeque();

        ConsumerImpl(KafkaConsumer<byte[], byte[]> kafkaConsumer, String str, Collection<TopicPartition> collection) {
            this.consumer = kafkaConsumer;
            this.clientId = str;
            this.topics = (Set) collection.stream().map((v0) -> {
                return v0.topic();
            }).collect(Collectors.toSet());
            logger.trace("KafkaConsumer topics {} assigning...", this.topics);
            this.consumer.assign(collection);
            Awaitility.await().atMost(Duration.ofMinutes(1L)).pollInterval(Duration.ofMillis(50L)).until(() -> {
                try {
                    return this.consumer.listTopics(Duration.ofSeconds(10L));
                } catch (Exception e) {
                    return Collections.emptyMap();
                }
            }, map -> {
                return new HashSet(map.keySet()).containsAll(this.topics);
            });
            logger.debug("KafkaConsumer topics {} assigned", this.topics);
            logger.trace("KafkaConsumer topics {} poll starting", this.topics);
            poll(Duration.ofMillis(50L));
            this.executor.execute(this::launch);
            logger.debug("KafkaConsumer topics {} poll started.", this.topics);
        }

        private void launch() {
            logger.info("KafkaConsumer '{}' started consuming events from topics: {}", this.clientId, this.topics);
            while (this.isActive.get()) {
                try {
                    poll(Duration.ofMillis(100L));
                } catch (WakeupException | InterruptException e) {
                } catch (Exception e2) {
                    logger.error("KafkaConsumer '{}' for {} topics got unhandled exception", new Object[]{this.clientId, this.topics, e2});
                    this.consumer.close(Duration.ofMinutes(5L));
                    throw e2;
                }
            }
        }

        private void poll(Duration duration) {
            ConsumerRecords poll = this.consumer.poll(duration);
            if (poll.isEmpty()) {
                logger.trace("KafkaConsumer '{}' polled '{}' records...", this.clientId, Integer.valueOf(poll.count()));
            } else {
                logger.info("KafkaConsumer '{}' polled '{}' records from topics: {}", new Object[]{this.clientId, Integer.valueOf(poll.count()), this.topics});
            }
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                this.messageQueue.offer((ConsumerRecord) it.next());
            }
        }

        @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection.Consumer
        @NotNull
        public List<ReceivedEvent> receivedPreviously() {
            return List.copyOf(this.receivedPreviously);
        }

        @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection.Consumer
        @NotNull
        public Optional<ReceivedEvent> getReceived(@NotNull Duration duration) {
            try {
                ConsumerRecord<byte[], byte[]> poll = this.messageQueue.poll(duration.toMillis(), TimeUnit.MILLISECONDS);
                if (poll == null) {
                    return Optional.empty();
                }
                ReceivedEventImpl receivedEventImpl = new ReceivedEventImpl(poll);
                this.receivedPreviously.add(receivedEventImpl);
                return Optional.of(receivedEventImpl);
            } catch (InterruptedException e) {
                return (Optional) Assertions.fail("Expected to receive 1 event, but was interrupted: " + e.getMessage());
            }
        }

        @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection.Consumer
        @NotNull
        public ReceivedEvent getReceivedAtLeastOne(@NotNull Duration duration) {
            return getReceivedAtLeast(1, duration).get(0);
        }

        @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection.Consumer
        @NotNull
        public List<ReceivedEvent> getReceivedAtLeast(int i, @NotNull Duration duration) {
            CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
            ArrayList arrayList = new ArrayList();
            this.messageQueue.drainTo(arrayList, i);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                copyOnWriteArrayList.add(new ReceivedEventImpl((ConsumerRecord) it.next()));
            }
            if (copyOnWriteArrayList.size() == i) {
                this.receivedPreviously.addAll(copyOnWriteArrayList);
                return List.copyOf(copyOnWriteArrayList);
            }
            try {
                Awaitility.await().atMost(duration).until(() -> {
                    try {
                        ConsumerRecord<byte[], byte[]> poll = this.messageQueue.poll(duration.toMillis(), TimeUnit.MILLISECONDS);
                        if (poll == null) {
                            return copyOnWriteArrayList;
                        }
                        copyOnWriteArrayList.add(new ReceivedEventImpl(poll));
                        return copyOnWriteArrayList;
                    } catch (InterruptedException e) {
                        return (List) Assertions.fail(String.format("Expected to receive at least %s event, but was interrupted: %s", Integer.valueOf(i), e.getMessage()));
                    }
                }, list -> {
                    return list.size() >= i;
                });
            } catch (ConditionTimeoutException e) {
            }
            this.receivedPreviously.addAll(copyOnWriteArrayList);
            return List.copyOf(copyOnWriteArrayList);
        }

        private List<ReceivedEvent> getReceivedEqualsInTime(int i, @NotNull Duration duration) {
            try {
                Thread.sleep(duration.toMillis());
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                this.messageQueue.drainTo(arrayList2);
                Iterator it = arrayList2.iterator();
                while (it.hasNext()) {
                    arrayList.add(new ReceivedEventImpl((ConsumerRecord) it.next()));
                }
                this.receivedPreviously.addAll(arrayList);
                return List.copyOf(arrayList);
            } catch (InterruptedException e) {
                return (List) Assertions.fail(String.format("Expected to receive %s event, but was interrupted: %s", Integer.valueOf(i), e.getMessage()));
            }
        }

        @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection.Consumer
        public void assertReceivedNone(@NotNull Duration duration) {
            if (checkReceivedNone(duration)) {
                return;
            }
            Assertions.fail("Expected to receive 0 events, but received at least 1 event");
        }

        @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection.Consumer
        @NotNull
        public ReceivedEvent assertReceivedAtLeastOne(@NotNull Duration duration) {
            return assertReceivedAtLeast(1, duration).get(0);
        }

        @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection.Consumer
        @NotNull
        public List<ReceivedEvent> assertReceivedAtLeast(int i, @NotNull Duration duration) {
            List<ReceivedEvent> receivedAtLeast = getReceivedAtLeast(i, duration);
            return receivedAtLeast.size() < i ? (List) Assertions.fail(String.format("Expected to receive at least %s event, but received %s events", Integer.valueOf(i), Integer.valueOf(receivedAtLeast.size()))) : receivedAtLeast;
        }

        @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection.Consumer
        @NotNull
        public List<ReceivedEvent> assertReceivedEqualsInTime(int i, @NotNull Duration duration) {
            List<ReceivedEvent> receivedEqualsInTime = getReceivedEqualsInTime(i, duration);
            return receivedEqualsInTime.size() != i ? (List) Assertions.fail(String.format("Expected to receive %s event, but received %s events", Integer.valueOf(i), Integer.valueOf(receivedEqualsInTime.size()))) : receivedEqualsInTime;
        }

        @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection.Consumer
        public boolean checkReceivedNone(@NotNull Duration duration) {
            try {
                ConsumerRecord<byte[], byte[]> poll = this.messageQueue.poll(duration.toMillis(), TimeUnit.MILLISECONDS);
                if (poll == null) {
                    return true;
                }
                this.receivedPreviously.add(new ReceivedEventImpl(poll));
                return false;
            } catch (InterruptedException e) {
                return ((Boolean) Assertions.fail("Expected to receive 0 event, but was interrupted: " + e.getMessage())).booleanValue();
            }
        }

        @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection.Consumer
        public boolean checkReceivedAtLeast(int i, @NotNull Duration duration) {
            return getReceivedAtLeast(i, duration).size() >= i;
        }

        @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection.Consumer
        public boolean checkReceivedEqualsInTime(int i, @NotNull Duration duration) {
            return getReceivedEqualsInTime(i, duration).size() == i;
        }

        @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection.Consumer
        public void reset() {
            this.receivedPreviously.clear();
            this.messageQueue.clear();
        }

        void close() {
            if (this.isActive.compareAndSet(true, false)) {
                logger.debug("Stopping KafkaConsumer '{}' for {} topics...", this.clientId, this.topics);
                long nanoTime = System.nanoTime();
                try {
                    this.executor.shutdownNow();
                    this.consumer.wakeup();
                    this.executor.awaitTermination(1L, TimeUnit.MINUTES);
                    this.consumer.close(Duration.ofMinutes(5L));
                    reset();
                    logger.info("Stopped KafkaConsumer '{}' for {} topics took {}", new Object[]{this.clientId, this.topics, Duration.ofNanos(System.nanoTime() - nanoTime)});
                } catch (Exception e) {
                    reset();
                    logger.info("Stopped KafkaConsumer '{}' for {} topics took {}", new Object[]{this.clientId, this.topics, Duration.ofNanos(System.nanoTime() - nanoTime)});
                } catch (Throwable th) {
                    reset();
                    logger.info("Stopped KafkaConsumer '{}' for {} topics took {}", new Object[]{this.clientId, this.topics, Duration.ofNanos(System.nanoTime() - nanoTime)});
                    throw th;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/goodforgod/testcontainers/extensions/kafka/KafkaConnectionImpl$ParamsImpl.class */
    public static final class ParamsImpl implements KafkaConnection.Params {
        private final String bootstrapServers;
        private final Properties properties;

        private ParamsImpl(Properties properties) {
            this.bootstrapServers = properties.getProperty("bootstrap.servers");
            this.properties = properties;
        }

        @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection.Params
        @NotNull
        public String bootstrapServers() {
            return this.bootstrapServers;
        }

        @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection.Params
        @NotNull
        public Properties properties() {
            return this.properties;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ParamsImpl paramsImpl = (ParamsImpl) obj;
            return Objects.equals(this.bootstrapServers, paramsImpl.bootstrapServers) && Objects.equals(this.properties, paramsImpl.properties);
        }

        public int hashCode() {
            return Objects.hash(this.bootstrapServers, this.properties);
        }

        public String toString() {
            return this.bootstrapServers;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaConnectionImpl(Properties properties, @Nullable Properties properties2) {
        this.params = new ParamsImpl(properties);
        this.paramsInNetwork = properties2 == null ? null : new ParamsImpl(properties2);
    }

    @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection
    @NotNull
    public Optional<KafkaConnection.Params> paramsInNetwork() {
        return Optional.ofNullable(this.paramsInNetwork);
    }

    @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection
    @NotNull
    public KafkaConnection.Params params() {
        return this.params;
    }

    @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection
    @NotNull
    public KafkaConnectionClosable withProperties(@NotNull Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(this.params.properties());
        properties2.putAll(properties);
        return new KafkaConnectionClosableImpl(properties2, (Properties) paramsInNetwork().map(params -> {
            Properties properties3 = new Properties();
            properties3.putAll(params.properties());
            properties3.putAll(properties);
            return properties3;
        }).orElse(null));
    }

    @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection
    public void send(@NotNull String str, @NotNull Event... eventArr) {
        send(str, Arrays.asList(eventArr));
    }

    @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection
    public void send(@NotNull String str, @NotNull List<Event> list) {
        if (this.isClosed) {
            throw new KafkaConnectionException("Can't send cause was closed");
        }
        if (this.producer == null) {
            try {
                this.producer = getProducer(this.params.properties());
            } catch (Exception e) {
                throw new KafkaConnectionException("Can't create KafkaProducer", e);
            }
        }
        createTopicsIfNeeded(Set.of(str), false);
        for (Event event : list) {
            byte[] asBytes = event.key() == null ? null : event.key().asBytes();
            List list2 = event.headers().isEmpty() ? null : (List) event.headers().stream().map(header -> {
                return new RecordHeader(header.key(), header.value().asBytes());
            }).collect(Collectors.toList());
            try {
                logger.trace("KafkaProducer sending event: {}", event);
                RecordMetadata recordMetadata = (RecordMetadata) this.producer.send(new ProducerRecord(str, (Integer) null, asBytes, event.value().asBytes(), list2)).get(10L, TimeUnit.SECONDS);
                logger.info("KafkaProducer sent event with offset '{}' with partition '{}' with timestamp '{}' event: {}", new Object[]{Long.valueOf(recordMetadata.offset()), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.timestamp()), event});
            } catch (Exception e2) {
                throw new KafkaConnectionException("KafkaProducer sent event failed: " + event, e2);
            }
        }
    }

    @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection
    @NotNull
    public KafkaConnection.Consumer subscribe(@NotNull String... strArr) {
        return subscribe(new HashSet(Arrays.asList(strArr)));
    }

    @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection
    @NotNull
    public KafkaConnection.Consumer subscribe(@NotNull Set<String> set) {
        if (this.isClosed) {
            throw new KafkaConnectionException("Can't subscribed cause was closed");
        }
        try {
            Admin admin = getAdmin();
            createTopicsIfNeeded(admin, set, false);
            Set set2 = (Set) ((Map) Awaitility.await().atMost(Duration.ofMinutes(1L)).pollInterval(Duration.ofMillis(100L)).until(() -> {
                try {
                    return (Map) admin.describeTopics(set).allTopicNames().get(10L, TimeUnit.SECONDS);
                } catch (Exception e) {
                    return Collections.emptyMap();
                }
            }, map -> {
                return ((Set) map.values().stream().map((v0) -> {
                    return v0.name();
                }).collect(Collectors.toSet())).containsAll(set);
            })).entrySet().stream().filter(entry -> {
                return set.contains(((TopicDescription) entry.getValue()).name());
            }).flatMap(entry2 -> {
                return ((TopicDescription) entry2.getValue()).partitions().stream().map(topicPartitionInfo -> {
                    return new TopicPartition(((TopicDescription) entry2.getValue()).name(), topicPartitionInfo.partition());
                });
            }).collect(Collectors.toSet());
            String str = "testcontainers-kafka-" + UUID.randomUUID().toString().substring(0, 8);
            ConsumerImpl consumerImpl = new ConsumerImpl(getConsumer(str, this.params.properties()), str, set2);
            this.consumers.add(consumerImpl);
            return consumerImpl;
        } catch (Exception e) {
            throw new KafkaConnectionException("Can't create KafkaConsumer", e);
        }
    }

    private static KafkaProducer<byte[], byte[]> getProducer(Properties properties) {
        Properties properties2 = new Properties();
        properties2.put("acks", "all");
        properties2.put("retries", "3");
        properties2.putAll(properties);
        return new KafkaProducer<>(properties2, new ByteArraySerializer(), new ByteArraySerializer());
    }

    private static KafkaConsumer<byte[], byte[]> getConsumer(String str, Properties properties) {
        Properties properties2 = new Properties();
        properties2.put("auto.offset.reset", "latest");
        properties2.put("enable.auto.commit", false);
        properties2.put("max.poll.records", "5");
        properties2.put("client.id", str);
        properties2.putAll(properties);
        return new KafkaConsumer<>(properties2, new ByteArrayDeserializer(), new ByteArrayDeserializer());
    }

    private static Admin getAdmin(Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        return Admin.create(properties2);
    }

    private Admin getAdmin() {
        if (this.admin == null) {
            this.admin = getAdmin(params().properties());
        }
        return this.admin;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void createTopicsIfNeeded(@NotNull Set<String> set, boolean z) {
        createTopicsIfNeeded(getAdmin(), set, z);
    }

    static void createTopicsIfNeeded(@NotNull Admin admin, @NotNull Set<String> set, boolean z) {
        try {
            logger.trace("Looking for existing topics...");
            Set set2 = (Set) admin.listTopics().names().get(1L, TimeUnit.MINUTES);
            logger.debug("Found existing topics: {}", set2);
            Set set3 = (Set) set.stream().filter(str -> {
                return !set2.contains(str);
            }).map(str2 -> {
                return new NewTopic(str2, Optional.of(1), Optional.empty());
            }).collect(Collectors.toSet());
            Stream stream = set2.stream();
            Objects.requireNonNull(set);
            Set set4 = (Set) stream.filter((v1) -> {
                return r1.contains(v1);
            }).collect(Collectors.toSet());
            if (!set3.isEmpty()) {
                logger.trace("Topics {} creating...", set);
                admin.createTopics(set3).all().get(2L, TimeUnit.MINUTES);
                logger.info("Required topics {} created", set);
            } else if (!z || set4.isEmpty()) {
                logger.debug("Required topics already exist: {}", set);
            } else {
                logger.trace("Required topics {} already exist, but require reset, resetting...", set4);
                admin.deleteTopics(set4).all().get(1L, TimeUnit.MINUTES);
                logger.debug("Topics {} reset success", set4);
                Set set5 = (Set) set4.stream().map(str3 -> {
                    return new NewTopic(str3, Optional.of(1), Optional.empty());
                }).collect(Collectors.toSet());
                logger.trace("Topics {} recreating...", set4);
                Awaitility.await().atMost(Duration.ofSeconds(30L)).pollInterval(Duration.ofMillis(50L)).until(() -> {
                    try {
                        admin.createTopics(set5).all().get(10L, TimeUnit.SECONDS);
                        return true;
                    } catch (ExecutionException e) {
                        if (e.getCause() instanceof TopicExistsException) {
                            return false;
                        }
                        throw new KafkaConnectionException("Kafka Admin operation failed for topics: " + set, e);
                    } catch (Exception e2) {
                        throw new KafkaConnectionException("Kafka Admin operation failed for topics: " + set, e2);
                    }
                });
                logger.info("Required topics {} recreated", set4);
            }
        } catch (Exception e) {
            throw new KafkaConnectionException("Kafka Admin operation failed for topics: " + set, e);
        } catch (TopicExistsException e2) {
            logger.trace("Required topics already exist exception received: {}", set);
        } catch (ExecutionException e3) {
            if (!(e3.getCause() instanceof TopicExistsException)) {
                throw new KafkaConnectionException("Kafka Admin operation failed for topics: " + set, e3);
            }
            logger.trace("Required topics already exist exception received: {}", set);
        }
    }

    @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection
    @NotNull
    public Admin admin() {
        Properties properties = new Properties();
        properties.putAll(this.params.properties());
        return Admin.create(properties);
    }

    @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection
    public void createTopics(@NotNull Set<String> set) {
        Admin admin = admin();
        try {
            createTopicsIfNeeded(admin, set, false);
            if (admin != null) {
                admin.close();
            }
        } catch (Throwable th) {
            if (admin != null) {
                try {
                    admin.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // io.goodforgod.testcontainers.extensions.kafka.KafkaConnection
    public void dropTopics(@NotNull Set<String> set) {
        try {
            Admin admin = admin();
            try {
                logger.trace("Looking for existing topics...");
                Set set2 = (Set) admin.listTopics().names().get(1L, TimeUnit.MINUTES);
                logger.debug("Found existing topics: {}", set2);
                Stream stream = set2.stream();
                Objects.requireNonNull(set);
                Set set3 = (Set) stream.filter((v1) -> {
                    return r1.contains(v1);
                }).collect(Collectors.toSet());
                if (set3.isEmpty()) {
                    logger.debug("Required topics already dropped: {}", set);
                } else {
                    logger.trace("Topics {} dropping...", set3);
                    KafkaFuture.allOf((KafkaFuture[]) admin.deleteTopics(set).topicNameValues().values().toArray(i -> {
                        return new KafkaFuture[i];
                    })).get(1L, TimeUnit.MINUTES);
                    logger.info("Required topics {} dropped", set3);
                }
                if (admin != null) {
                    admin.close();
                }
            } finally {
            }
        } catch (Exception e) {
            throw new KafkaConnectionException("Kafka Admin operation failed for topics: " + set, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clear() {
        Iterator<ConsumerImpl> it = this.consumers.iterator();
        while (it.hasNext()) {
            try {
                it.next().close();
            } catch (Exception e) {
            }
        }
        this.consumers.clear();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        if (this.isClosed) {
            return;
        }
        this.isClosed = true;
        clear();
        if (this.admin != null) {
            try {
                this.admin.close(Duration.ofMinutes(1L));
                this.admin = null;
            } catch (Exception e) {
            }
        }
        if (this.producer != null) {
            try {
                this.producer.close(Duration.ofMinutes(1L));
                this.producer = null;
            } catch (Exception e2) {
            }
        }
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        return Objects.equals(this.params, ((KafkaConnectionImpl) obj).params);
    }

    public int hashCode() {
        return Objects.hash(this.params);
    }

    public String toString() {
        return this.params.bootstrapServers();
    }
}
