/*
 * Decompiled with CFR 0.152.
 */
package io.goodforgod.testcontainers.extensions.kafka;

import io.goodforgod.testcontainers.extensions.kafka.Event;
import io.goodforgod.testcontainers.extensions.kafka.KafkaConnection;
import io.goodforgod.testcontainers.extensions.kafka.KafkaConnectionClosableImpl;
import io.goodforgod.testcontainers.extensions.kafka.KafkaConnectionException;
import io.goodforgod.testcontainers.extensions.kafka.ReceivedEvent;
import io.goodforgod.testcontainers.extensions.kafka.ReceivedEventImpl;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.shaded.org.awaitility.core.ConditionTimeoutException;

@ApiStatus.Internal
class KafkaConnectionImpl
implements KafkaConnection {
    private static final Duration POLL_TIMEOUT = Duration.ofMillis(500L);
    private static final Logger logger = LoggerFactory.getLogger(KafkaConnection.class);
    private volatile boolean isClosed = false;
    private volatile KafkaProducer<byte[], byte[]> producer;
    private volatile Admin admin;
    private final Map<String, ConsumerImpl> consumerByTopic = new ConcurrentHashMap<String, ConsumerImpl>();
    private final ParamsImpl params;
    @Nullable
    private final ParamsImpl paramsInNetwork;

    KafkaConnectionImpl(Properties properties, @Nullable Properties propertiesInNetwork) {
        this.params = new ParamsImpl(properties);
        this.paramsInNetwork = propertiesInNetwork == null ? null : new ParamsImpl(propertiesInNetwork);
    }

    @Override
    @NotNull
    public Optional<KafkaConnection.Params> paramsInNetwork() {
        return Optional.ofNullable(this.paramsInNetwork);
    }

    @Override
    @NotNull
    public KafkaConnection.Params params() {
        return this.params;
    }

    @Override
    @NotNull
    public KafkaConnection withProperties(@NotNull Properties properties) {
        Properties kafkaProperties = new Properties();
        kafkaProperties.putAll((Map<?, ?>)this.params.properties());
        kafkaProperties.putAll((Map<?, ?>)properties);
        Properties networkProperties = this.paramsInNetwork().map(props -> {
            Properties kafkaNetworkProperties = new Properties();
            kafkaNetworkProperties.putAll((Map<?, ?>)props.properties());
            kafkaNetworkProperties.putAll((Map<?, ?>)properties);
            return kafkaNetworkProperties;
        }).orElse(null);
        return new KafkaConnectionClosableImpl(kafkaProperties, networkProperties);
    }

    @Override
    public void send(@NotNull String topic, Event ... events) {
        this.send(topic, Arrays.asList(events));
    }

    @Override
    public void send(@NotNull String topic, @NotNull List<Event> events) {
        if (this.isClosed) {
            throw new KafkaConnectionException("Can't send cause was closed");
        }
        if (this.producer == null) {
            try {
                this.producer = KafkaConnectionImpl.getProducer(this.params.properties());
            }
            catch (Exception e) {
                throw new KafkaConnectionException("Can't create KafkaProducer", e);
            }
        }
        this.createTopicsIfNeeded(Set.of(topic), false);
        for (Event event : events) {
            byte[] key = event.key() == null ? null : event.key().asBytes();
            List headers = event.headers().isEmpty() ? null : event.headers().stream().map(header -> new RecordHeader(header.key(), header.value().asBytes())).collect(Collectors.toList());
            try {
                logger.trace("KafkaProducer sending event: {}", (Object)event);
                RecordMetadata result = (RecordMetadata)this.producer.send(new ProducerRecord(topic, null, (Object)key, (Object)event.value().asBytes(), headers)).get(10L, TimeUnit.SECONDS);
                logger.info("KafkaProducer sent event to topic '{}' with offset '{}' with partition '{}' with timestamp '{}' event: {}", new Object[]{topic, result.offset(), result.partition(), result.timestamp(), event});
            }
            catch (Exception e) {
                throw new KafkaConnectionException("KafkaProducer sent event failed: " + event, e);
            }
        }
    }

    @Override
    @NotNull
    public KafkaConnection.Consumer subscribe(String ... topics) {
        return this.subscribe(new HashSet<String>(Arrays.asList(topics)));
    }

    @Override
    @NotNull
    public KafkaConnection.Consumer subscribe(@NotNull Set<String> topics) {
        if (this.isClosed) {
            throw new KafkaConnectionException("Can't subscribed cause was closed");
        }
        try {
            Admin admin = this.getAdmin();
            KafkaConnectionImpl.createTopicsIfNeeded(admin, topics, false);
            Map topicInfo = (Map)Awaitility.await().atMost(Duration.ofMinutes(1L)).pollInterval(Duration.ofMillis(100L)).until(() -> {
                try {
                    return (Map)admin.describeTopics((Collection)topics).allTopicNames().get(10L, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    logger.warn(e.getMessage());
                    return Collections.emptyMap();
                }
            }, result -> result.values().stream().map(TopicDescription::name).collect(Collectors.toSet()).containsAll(topics));
            Set<TopicPartition> topicPartition = topicInfo.entrySet().stream().filter(e -> topics.contains(((TopicDescription)e.getValue()).name())).flatMap(e -> ((TopicDescription)e.getValue()).partitions().stream().map(p -> new TopicPartition(((TopicDescription)e.getValue()).name(), p.partition()))).collect(Collectors.toSet());
            String id = UUID.randomUUID().toString().substring(0, 8);
            String consumerTopicKey = topics.stream().sorted().collect(Collectors.joining(":"));
            ConsumerImpl consumer = this.consumerByTopic.computeIfAbsent(consumerTopicKey, k -> {
                KafkaConsumer<byte[], byte[]> kafkaConsumer = KafkaConnectionImpl.getConsumer(id, this.params.properties());
                return new ConsumerImpl(kafkaConsumer, id, topicPartition);
            });
            if (consumer.isClosed()) {
                KafkaConsumer<byte[], byte[]> kafkaConsumer = KafkaConnectionImpl.getConsumer(id, this.params.properties());
                ConsumerImpl activeConsumer = new ConsumerImpl(kafkaConsumer, id, topicPartition);
                this.consumerByTopic.put(consumerTopicKey, activeConsumer);
                return activeConsumer;
            }
            return consumer;
        }
        catch (Exception e2) {
            throw new KafkaConnectionException("Can't create KafkaConsumer", e2);
        }
    }

    private static KafkaProducer<byte[], byte[]> getProducer(Properties properties) {
        Properties producerProperties = new Properties();
        producerProperties.put("acks", "all");
        producerProperties.put("retries", "3");
        producerProperties.putAll((Map<?, ?>)properties);
        return new KafkaProducer(producerProperties, (Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
    }

    private static KafkaConsumer<byte[], byte[]> getConsumer(String consumerId, Properties properties) {
        Properties consumerProperties = new Properties();
        consumerProperties.put("auto.offset.reset", "latest");
        consumerProperties.put("enable.auto.commit", (Object)false);
        consumerProperties.put("max.poll.records", "5");
        consumerProperties.put("client.id", consumerId);
        consumerProperties.putAll((Map<?, ?>)properties);
        return new KafkaConsumer(consumerProperties, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
    }

    private static Admin getAdmin(Properties properties) {
        Properties adminProperties = new Properties();
        adminProperties.putAll((Map<?, ?>)properties);
        return Admin.create((Properties)adminProperties);
    }

    private Admin getAdmin() {
        if (this.admin == null) {
            this.admin = KafkaConnectionImpl.getAdmin(this.params().properties());
        }
        return this.admin;
    }

    void createTopicsIfNeeded(@NotNull Set<String> topics, boolean reset) {
        KafkaConnectionImpl.createTopicsIfNeeded(this.getAdmin(), topics, reset);
    }

    static void createTopicsIfNeeded(@NotNull Admin admin, @NotNull Set<String> topics, boolean reset) {
        try {
            logger.trace("Looking for existing topics...");
            Set existingTopics = (Set)admin.listTopics().names().get(1L, TimeUnit.MINUTES);
            logger.debug("Found existing topics: {}", (Object)existingTopics);
            Set topicsToCreate = topics.stream().filter(topic -> !existingTopics.contains(topic)).map(topic -> new NewTopic(topic, Optional.of(1), Optional.empty())).collect(Collectors.toSet());
            Set topicsToReset = existingTopics.stream().filter(topics::contains).collect(Collectors.toSet());
            if (!topicsToCreate.isEmpty()) {
                logger.trace("Topics {} creating...", topics);
                CreateTopicsResult result2 = admin.createTopics(topicsToCreate);
                result2.all().get(1L, TimeUnit.MINUTES);
                logger.info("Topics {} created", topics);
            } else if (reset && !topicsToReset.isEmpty()) {
                logger.trace("Topics {} already exist, but require reset, resetting...", topicsToReset);
                admin.deleteTopics(topicsToReset).all().get(1L, TimeUnit.MINUTES);
                logger.debug("Topics {} reset success", topicsToReset);
                Set topicsToCreateAfterReset = topicsToReset.stream().map(topic -> new NewTopic(topic, Optional.of(1), Optional.empty())).collect(Collectors.toSet());
                logger.trace("Topics {} reset status check...", topicsToReset);
                Awaitility.await().atMost(Duration.ofSeconds(35L)).pollInterval(Duration.ofMillis(100L)).until(() -> {
                    try {
                        return (Map)admin.describeTopics((Collection)topics).allTopicNames().get(10L, TimeUnit.SECONDS);
                    }
                    catch (Exception e) {
                        logger.warn(e.getMessage());
                        return Collections.emptyMap();
                    }
                }, result -> result.values().stream().map(TopicDescription::name).filter(topicsToReset::contains).findFirst().isEmpty());
                logger.debug("Topics {} reset status check success", topicsToReset);
                logger.trace("Topics {} recreating...", topicsToReset);
                Awaitility.await().atMost(Duration.ofSeconds(35L)).pollInterval(Duration.ofMillis(50L)).until(() -> {
                    try {
                        admin.createTopics((Collection)topicsToCreateAfterReset).all().get(10L, TimeUnit.SECONDS);
                        return true;
                    }
                    catch (ExecutionException e) {
                        if (e.getCause() instanceof TopicExistsException) {
                            Thread.sleep(500L);
                            return false;
                        }
                        throw new KafkaConnectionException("Kafka Admin operation failed for topics: " + topics, e);
                    }
                    catch (Exception e) {
                        throw new KafkaConnectionException("Kafka Admin operation failed for topics: " + topics, e);
                    }
                });
                logger.info("Topics {} recreated", topicsToReset);
            } else {
                logger.debug("Topics already exist: {}", topics);
            }
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof TopicExistsException) {
                logger.trace("Topics already exist exception received: {}", topics);
            }
            throw new KafkaConnectionException("Kafka Admin operation failed for topics: " + topics, e);
        }
        catch (TopicExistsException e) {
            logger.trace("Topics already exist exception received: {}", topics);
        }
        catch (Exception e) {
            throw new KafkaConnectionException("Kafka Admin operation failed for topics: " + topics, e);
        }
    }

    @Override
    @NotNull
    public Admin admin() {
        Properties adminProperties = new Properties();
        adminProperties.putAll((Map<?, ?>)this.params.properties());
        return Admin.create((Properties)adminProperties);
    }

    @Override
    public void createTopics(@NotNull Set<String> topics) {
        try (Admin admin = this.admin();){
            KafkaConnectionImpl.createTopicsIfNeeded(admin, topics, false);
        }
    }

    @Override
    public void dropTopics(@NotNull Set<String> topics) {
        try (Admin admin = this.admin();){
            logger.trace("Looking for existing topics...");
            Set existingTopics = (Set)admin.listTopics().names().get(1L, TimeUnit.MINUTES);
            logger.debug("Found existing topics: {}", (Object)existingTopics);
            Set topicsToDrop = existingTopics.stream().filter(topics::contains).collect(Collectors.toSet());
            if (!topicsToDrop.isEmpty()) {
                logger.trace("Topics {} dropping...", topicsToDrop);
                DeleteTopicsResult deleteTopicsResult = admin.deleteTopics(topics);
                KafkaFuture[] deleteFutures = (KafkaFuture[])deleteTopicsResult.topicNameValues().values().toArray(KafkaFuture[]::new);
                KafkaFuture.allOf((KafkaFuture[])deleteFutures).get(1L, TimeUnit.MINUTES);
                logger.info("Required topics {} dropped", topicsToDrop);
            } else {
                logger.debug("Required topics already dropped: {}", topics);
            }
        }
        catch (Exception e) {
            throw new KafkaConnectionException("Kafka Admin operation failed for topics: " + topics, e);
        }
    }

    void clear() {
        for (ConsumerImpl consumer : this.consumerByTopic.values()) {
            try {
                consumer.stop();
            }
            catch (Exception exception) {}
        }
        this.consumerByTopic.clear();
    }

    void stop() {
        if (!this.isClosed) {
            this.isClosed = true;
            this.clear();
            if (this.admin != null) {
                try {
                    this.admin.close(Duration.ofMinutes(1L));
                    this.admin = null;
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            if (this.producer != null) {
                try {
                    this.producer.close(Duration.ofMinutes(1L));
                    this.producer = null;
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }
    }

    @Override
    public void close() {
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        KafkaConnectionImpl that = (KafkaConnectionImpl)o;
        return Objects.equals(this.params, that.params);
    }

    public int hashCode() {
        return Objects.hash(this.params);
    }

    public String toString() {
        return this.params.bootstrapServers();
    }

    private static final class ParamsImpl
    implements KafkaConnection.Params {
        private final String bootstrapServers;
        private final Properties properties;

        private ParamsImpl(Properties properties) {
            this.bootstrapServers = properties.getProperty("bootstrap.servers");
            this.properties = properties;
        }

        @Override
        @NotNull
        public String bootstrapServers() {
            return this.bootstrapServers;
        }

        @Override
        @NotNull
        public Properties properties() {
            return this.properties;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ParamsImpl params = (ParamsImpl)o;
            return Objects.equals(this.bootstrapServers, params.bootstrapServers) && Objects.equals(this.properties, params.properties);
        }

        public int hashCode() {
            return Objects.hash(this.bootstrapServers, this.properties);
        }

        public String toString() {
            return this.bootstrapServers;
        }
    }

    static final class ConsumerImpl
    implements KafkaConnection.Consumer {
        private static final Logger logger = LoggerFactory.getLogger(KafkaConnection.Consumer.class);
        private final ExecutorService executor = Executors.newSingleThreadExecutor();
        private final List<ReceivedEvent> receivedPreviously = new CopyOnWriteArrayList<ReceivedEvent>();
        private final AtomicBoolean isActive = new AtomicBoolean(true);
        private final KafkaConsumer<byte[], byte[]> consumer;
        private final BlockingQueue<ConsumerRecord<byte[], byte[]>> messageQueue = new LinkedBlockingDeque<ConsumerRecord<byte[], byte[]>>();
        private final Set<String> topics;
        private final String clientId;

        ConsumerImpl(KafkaConsumer<byte[], byte[]> consumer, String clientId, Collection<TopicPartition> topicPartitions) {
            this.consumer = consumer;
            this.clientId = clientId;
            this.topics = topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
            logger.trace("KafkaConsumer topics {} assigning...", this.topics);
            this.consumer.assign(topicPartitions);
            Awaitility.await().atMost(Duration.ofMinutes(1L)).pollInterval(Duration.ofMillis(50L)).until(() -> {
                try {
                    return this.consumer.listTopics(Duration.ofSeconds(10L));
                }
                catch (Exception e) {
                    return Collections.emptyMap();
                }
            }, result -> new HashSet(result.keySet()).containsAll(this.topics));
            logger.debug("KafkaConsumer topics {} assigned", this.topics);
            logger.trace("KafkaConsumer topics {} poll starting", this.topics);
            this.poll(Duration.ofMillis(50L));
            this.executor.execute(this::launch);
            logger.debug("KafkaConsumer topics {} poll started.", this.topics);
        }

        private void launch() {
            logger.info("KafkaConsumer '{}' started consuming events from topics: {}", (Object)this.clientId, this.topics);
            while (this.isActive.get()) {
                try {
                    this.poll(POLL_TIMEOUT);
                }
                catch (InterruptException | WakeupException throwable) {
                }
                catch (Exception e) {
                    logger.error("KafkaConsumer '{}' for {} topics got unhandled exception", new Object[]{this.clientId, this.topics, e});
                    this.consumer.close(Duration.ofMinutes(2L));
                    throw e;
                }
            }
        }

        private void poll(Duration maxPollTimeout) {
            ConsumerRecords records = this.consumer.poll(maxPollTimeout);
            if (!records.isEmpty()) {
                logger.info("KafkaConsumer '{}' polled '{}' records from topics: {}", new Object[]{this.clientId, records.count(), this.topics});
            } else {
                logger.trace("KafkaConsumer '{}' polled '{}' records from topics {}...", new Object[]{this.clientId, records.count(), this.topics});
            }
            for (ConsumerRecord record : records) {
                this.messageQueue.offer((ConsumerRecord<byte[], byte[]>)record);
            }
        }

        @Override
        @NotNull
        public List<ReceivedEvent> receivedPreviously() {
            return List.copyOf(this.receivedPreviously);
        }

        @Override
        @NotNull
        public Optional<ReceivedEvent> getReceived(@NotNull Duration timeout) {
            try {
                ConsumerRecord<byte[], byte[]> received = this.messageQueue.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
                if (received == null) {
                    return Optional.empty();
                }
                ReceivedEventImpl event = new ReceivedEventImpl(received);
                this.receivedPreviously.add(event);
                return Optional.of(event);
            }
            catch (InterruptedException e) {
                return (Optional)Assertions.fail((String)("Expected to receive 1 event, but was interrupted: " + e.getMessage()));
            }
        }

        @Override
        @NotNull
        public ReceivedEvent getReceivedAtLeastOne(@NotNull Duration timeout) {
            return this.getReceivedAtLeast(1, timeout).get(0);
        }

        @Override
        @NotNull
        public List<ReceivedEvent> getReceivedAtLeast(int expectedEvents, @NotNull Duration timeout) {
            CopyOnWriteArrayList<ReceivedEventImpl> receivedEvents = new CopyOnWriteArrayList<ReceivedEventImpl>();
            ArrayList drainTo = new ArrayList();
            this.messageQueue.drainTo(drainTo, expectedEvents);
            for (ConsumerRecord consumerRecord : drainTo) {
                ReceivedEventImpl event = new ReceivedEventImpl((ConsumerRecord<byte[], byte[]>)consumerRecord);
                receivedEvents.add(event);
            }
            if (receivedEvents.size() == expectedEvents) {
                this.receivedPreviously.addAll(receivedEvents);
                return List.copyOf(receivedEvents);
            }
            try {
                Awaitility.await().atMost(timeout).until(() -> {
                    try {
                        ConsumerRecord<byte[], byte[]> received = this.messageQueue.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
                        if (received == null) {
                            return receivedEvents;
                        }
                        ReceivedEventImpl event = new ReceivedEventImpl(received);
                        receivedEvents.add(event);
                        return receivedEvents;
                    }
                    catch (InterruptedException e) {
                        return (List)Assertions.fail((String)String.format("Expected to receive at least %s event, but was interrupted: %s", expectedEvents, e.getMessage()));
                    }
                }, received -> received.size() >= expectedEvents);
            }
            catch (ConditionTimeoutException conditionTimeoutException) {
                // empty catch block
            }
            this.receivedPreviously.addAll(receivedEvents);
            return List.copyOf(receivedEvents);
        }

        private List<ReceivedEvent> getReceivedEqualsInTime(int expected, @NotNull Duration timeToWait) {
            try {
                Thread.sleep(timeToWait.toMillis());
                ArrayList<ReceivedEventImpl> receivedEvents = new ArrayList<ReceivedEventImpl>();
                ArrayList drainTo = new ArrayList();
                this.messageQueue.drainTo(drainTo);
                for (ConsumerRecord consumerRecord : drainTo) {
                    ReceivedEventImpl event = new ReceivedEventImpl((ConsumerRecord<byte[], byte[]>)consumerRecord);
                    receivedEvents.add(event);
                }
                this.receivedPreviously.addAll(receivedEvents);
                return List.copyOf(receivedEvents);
            }
            catch (InterruptedException e) {
                return (List)Assertions.fail((String)String.format("Expected to receive %s event, but was interrupted: %s", expected, e.getMessage()));
            }
        }

        @Override
        public void assertReceivedNone(@NotNull Duration timeToWait) {
            if (!this.checkReceivedNone(timeToWait)) {
                Assertions.fail((String)"Expected to receive 0 events, but received at least 1 event");
            }
        }

        @Override
        @NotNull
        public ReceivedEvent assertReceivedAtLeastOne(@NotNull Duration timeout) {
            return this.assertReceivedAtLeast(1, timeout).get(0);
        }

        @Override
        @NotNull
        public List<ReceivedEvent> assertReceivedAtLeast(int expectedAtLeast, @NotNull Duration timeout) {
            List<ReceivedEvent> received = this.getReceivedAtLeast(expectedAtLeast, timeout);
            if (received.size() < expectedAtLeast) {
                return (List)Assertions.fail((String)String.format("Expected to receive at least %s event, but received %s events", expectedAtLeast, received.size()));
            }
            return received;
        }

        @Override
        @NotNull
        public List<ReceivedEvent> assertReceivedEqualsInTime(int expected, @NotNull Duration timeToWait) {
            List<ReceivedEvent> received = this.getReceivedEqualsInTime(expected, timeToWait);
            if (received.size() != expected) {
                return (List)Assertions.fail((String)String.format("Expected to receive %s event, but received %s events", expected, received.size()));
            }
            return received;
        }

        @Override
        public boolean checkReceivedNone(@NotNull Duration timeToWait) {
            try {
                ConsumerRecord<byte[], byte[]> received = this.messageQueue.poll(timeToWait.toMillis(), TimeUnit.MILLISECONDS);
                if (received == null) {
                    return true;
                }
                ReceivedEventImpl event = new ReceivedEventImpl(received);
                this.receivedPreviously.add(event);
                return false;
            }
            catch (InterruptedException e) {
                return (Boolean)Assertions.fail((String)("Expected to receive 0 event, but was interrupted: " + e.getMessage()));
            }
        }

        @Override
        public boolean checkReceivedAtLeast(int expectedAtLeast, @NotNull Duration timeout) {
            List<ReceivedEvent> received = this.getReceivedAtLeast(expectedAtLeast, timeout);
            return received.size() >= expectedAtLeast;
        }

        @Override
        public boolean checkReceivedEqualsInTime(int expected, @NotNull Duration timeToWait) {
            List<ReceivedEvent> received = this.getReceivedEqualsInTime(expected, timeToWait);
            return received.size() == expected;
        }

        @Override
        public void reset() {
            this.receivedPreviously.clear();
            this.messageQueue.clear();
        }

        boolean isClosed() {
            return !this.isActive.get();
        }

        @Override
        public void close() {
            this.stop();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void stop() {
            if (this.isActive.compareAndSet(true, false)) {
                logger.debug("Stopping KafkaConsumer '{}' for {} topics...", (Object)this.clientId, this.topics);
                long started = System.nanoTime();
                try {
                    this.executor.shutdownNow();
                    this.consumer.wakeup();
                    this.executor.awaitTermination(1L, TimeUnit.MINUTES);
                    this.consumer.close(Duration.ofMinutes(2L));
                    this.reset();
                }
                catch (Exception exception) {
                    this.reset();
                    logger.info("Stopped KafkaConsumer '{}' for {} topics took {}", new Object[]{this.clientId, this.topics, Duration.ofNanos(System.nanoTime() - started)});
                }
                catch (Throwable throwable) {
                    this.reset();
                    logger.info("Stopped KafkaConsumer '{}' for {} topics took {}", new Object[]{this.clientId, this.topics, Duration.ofNanos(System.nanoTime() - started)});
                    throw throwable;
                }
                logger.info("Stopped KafkaConsumer '{}' for {} topics took {}", new Object[]{this.clientId, this.topics, Duration.ofNanos(System.nanoTime() - started)});
            }
        }
    }
}

