package house.inksoftware.systemtest.domain.kafka;

import house.inksoftware.systemtest.domain.utils.JsonUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.consumer.Consumer;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.test.utils.KafkaTestUtils;

/* loaded from: input_file:house/inksoftware/systemtest/domain/kafka/KafkaBackgroundConsumerService.class */
public class KafkaBackgroundConsumerService {
    private static final Logger log = LoggerFactory.getLogger(KafkaBackgroundConsumerService.class);
    private final Executor executor = Executors.newFixedThreadPool(4);
    private final Map<String, List<ConsumedRecord>> fetchedRecords = new HashMap();
    private final Consumer<String, Object> consumer;

    /* loaded from: input_file:house/inksoftware/systemtest/domain/kafka/KafkaBackgroundConsumerService$ConsumedRecord.class */
    public static class ConsumedRecord {
        private String body;
        private boolean readByTest;

        public static ConsumedRecord record(String str) {
            ConsumedRecord consumedRecord = new ConsumedRecord();
            consumedRecord.setBody(str);
            return consumedRecord;
        }

        public String getBody() {
            return this.body;
        }

        public boolean isReadByTest() {
            return this.readByTest;
        }

        public void setBody(String str) {
            this.body = str;
        }

        public void setReadByTest(boolean z) {
            this.readByTest = z;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof ConsumedRecord)) {
                return false;
            }
            ConsumedRecord consumedRecord = (ConsumedRecord) obj;
            if (!consumedRecord.canEqual(this) || isReadByTest() != consumedRecord.isReadByTest()) {
                return false;
            }
            String body = getBody();
            String body2 = consumedRecord.getBody();
            return body == null ? body2 == null : body.equals(body2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof ConsumedRecord;
        }

        public int hashCode() {
            int i = (1 * 59) + (isReadByTest() ? 79 : 97);
            String body = getBody();
            return (i * 59) + (body == null ? 43 : body.hashCode());
        }

        public String toString() {
            return "KafkaBackgroundConsumerService.ConsumedRecord(body=" + getBody() + ", readByTest=" + isReadByTest() + ")";
        }
    }

    public void initiate() {
        this.executor.execute(() -> {
            while (true) {
                try {
                    KafkaTestUtils.getRecords(this.consumer, 1000L).forEach(consumerRecord -> {
                        log.info("Read record {} from topic {}", consumerRecord.value().toString(), consumerRecord.topic());
                        List<ConsumedRecord> find = find(consumerRecord.topic());
                        find.add(ConsumedRecord.record(consumerRecord.value().toString()));
                        this.fetchedRecords.put(consumerRecord.topic(), find);
                    });
                } catch (IllegalStateException e) {
                }
            }
        });
    }

    private List<ConsumedRecord> find(String str) {
        return this.fetchedRecords.get(str) == null ? new ArrayList() : this.fetchedRecords.get(str);
    }

    public void find(String str, String str2) {
        try {
            Awaitility.await().atMost(Duration.ofSeconds(30L)).with().pollInterval(Duration.ofSeconds(2L)).until(() -> {
                Optional<ConsumedRecord> findAny = find(str).stream().filter(consumedRecord -> {
                    return !consumedRecord.readByTest && JsonUtils.isEqual(str2, consumedRecord.body);
                }).findAny();
                if (!findAny.isPresent()) {
                    return false;
                }
                findAny.get().readByTest = true;
                return true;
            });
        } catch (ConditionTimeoutException e) {
            throw new IllegalStateException("It was expected that there would be an event outputed to " + str + ", but it didn't happen");
        }
    }

    public KafkaBackgroundConsumerService(Consumer<String, Object> consumer) {
        this.consumer = consumer;
    }
}
