package io.axual.client.consumer.base;

import io.axual.client.config.BaseConsumerConfig;
import io.axual.client.consumer.Consumer;
import io.axual.client.consumer.Processor;
import io.axual.client.exception.ConsumeFailedException;
import io.axual.client.exception.RetriableException;
import io.axual.client.exception.SkippableException;
import io.axual.client.janitor.Janitor;
import io.axual.common.exception.ClientException;
import io.axual.common.tools.ExecutorUtil;
import io.axual.common.tools.SleepUtil;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axual/client/consumer/base/BaseConsumer.class */
public abstract class BaseConsumer<K, V> extends Janitor.ManagedCloseable implements Consumer<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(BaseConsumer.class);
    private static final String NEWLINE = "\n";
    private static final String LOG_ID = "ID: {}\n";
    private static final String LOG_KEY = "KEY: {}\n";
    private static final String LOG_VALUE = "VALUE: {}\n";
    private final BaseMessageSource<K, V> messageSource;
    private final Processor<K, V> processor;
    private final AtomicBoolean consumeThreadRunning = new AtomicBoolean(false);
    private final AtomicBoolean stopConsumer = new AtomicBoolean(false);
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private Future<ConsumeFailedException> consumeResult = null;

    public BaseConsumer(BaseMessageSource<K, V> baseMessageSource, Processor<K, V> processor) {
        this.messageSource = baseMessageSource;
        this.processor = processor;
        LOG.info("Created consumer with source of class: {}.", baseMessageSource.getClass().getName());
    }

    @Override // io.axual.client.consumer.Consumer
    public BaseConsumerConfig<K, V> getConfig() {
        return this.messageSource.getConsumerConfig();
    }

    @Override // io.axual.client.consumer.Consumer
    public Future<ConsumeFailedException> startConsuming() {
        if (this.consumeThreadRunning.compareAndSet(false, true)) {
            this.consumeResult = this.executorService.submit(() -> {
                try {
                    return consumeLoop();
                } catch (ConsumeFailedException e) {
                    return e;
                } finally {
                    this.consumeThreadRunning.set(false);
                }
            });
            return this.consumeResult;
        }
        LOG.warn("There is already a process running, no new process is started.");
        return null;
    }

    @Override // io.axual.client.consumer.Consumer
    public ConsumeFailedException stopConsuming() {
        this.stopConsumer.set(true);
        if (this.consumeResult == null || !this.consumeThreadRunning.get()) {
            return null;
        }
        return getConsumeResult();
    }

    @Override // io.axual.client.consumer.Consumer
    public boolean isConsuming() {
        return this.consumeThreadRunning.get();
    }

    @Override // io.axual.client.consumer.Consumer
    public ConsumeFailedException getConsumeResult() {
        if (this.consumeResult == null) {
            return null;
        }
        try {
            return this.consumeResult.get();
        } catch (CancellationException e) {
            return null;
        } catch (Exception e2) {
            return new ConsumeFailedException(e2);
        }
    }

    public ConsumeFailedException consumeLoop() {
        while (this.consumeResult == null) {
            SleepUtil.sleep(Duration.ofMillis(50L));
        }
        while (!this.stopConsumer.get()) {
            try {
                List<BaseMessage<K, V>> messages = this.messageSource.getMessages();
                LOG.trace("Fetched {} messages", Integer.valueOf(messages.size()));
                try {
                    Iterator<BaseMessage<K, V>> it = messages.iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        handleMessage(it.next());
                        if (this.stopConsumer.get()) {
                            LOG.warn("Consumer requested to stop, exiting batch processing loop");
                            break;
                        }
                    }
                    this.messageSource.onAfterProcessBatch();
                    LOG.trace("Processed message batch");
                } finally {
                }
            } catch (Exception e) {
                LOG.error("Error processing message batch", e);
                return new ConsumeFailedException(e, this.messageSource.getInfo());
            }
        }
        return null;
    }

    /* JADX WARN: Type inference failed for: r11v2, types: [io.axual.client.exception.RetriableException, java.lang.Throwable] */
    private void handleMessage(BaseMessage<K, V> baseMessage) {
        int i = 0;
        boolean z = false;
        while (!z && !this.stopConsumer.get()) {
            try {
                i++;
                LOG.trace("Processing message (try #{}):\nID: {}\nKEY: {}\nVALUE: {}\n", new Object[]{Integer.valueOf(i), baseMessage.getId(), baseMessage.getKey(), baseMessage.getValue()});
                this.processor.processMessage(baseMessage);
                LOG.debug("Message successfully processed");
                this.messageSource.onAfterProcessMessage(baseMessage, null);
                z = true;
            } catch (RetriableException e) {
                if (LOG.isTraceEnabled()) {
                    LOG.warn("Message could not be processed, retrying in {} ms.\nID: {}\nKEY: {}\nVALUE: {}\n", new Object[]{e.getSleepTime(), baseMessage.getId(), baseMessage.getKey(), baseMessage.getValue()});
                } else {
                    LOG.warn("Message could not be processed, retrying in {} ms.\nID: {}\n", e.getSleepTime(), baseMessage.getId());
                }
                this.messageSource.onAfterProcessMessage(baseMessage, e);
                SleepUtil.sleep(e.getSleepTime());
            } catch (SkippableException e2) {
                if (LOG.isTraceEnabled()) {
                    LOG.warn("Skipping unprocessable message:\nID: {}\nKEY: {}\nVALUE: {}\n", new Object[]{baseMessage.getId(), baseMessage.getKey(), baseMessage.getValue()});
                } else {
                    LOG.warn("Skipping unprocessable message:\nID: {}\n", baseMessage.getId());
                }
                this.messageSource.onAfterProcessMessage(baseMessage, e2);
                z = true;
            } catch (Exception e3) {
                if (LOG.isTraceEnabled()) {
                    LOG.error("Message could not be processed:\nID: {}\nKEY: {}\nVALUE: {}\nEXCEPTION: {}", new Object[]{baseMessage.getId(), baseMessage.getKey(), baseMessage.getValue(), e3});
                } else {
                    LOG.error("Message could not be processed:\nID: {}\nEXCEPTION: {}", baseMessage.getId(), e3);
                }
                this.messageSource.onAfterProcessMessage(baseMessage, e3);
                throw new ConsumeFailedException(e3);
            }
        }
    }

    @Override // io.axual.client.janitor.Janitor.ManagedCloseable, java.lang.AutoCloseable, io.axual.client.consumer.Consumer
    public void close() {
        ClientException stopConsuming = stopConsuming();
        ExecutorUtil.terminateExecutor(this.executorService, Duration.ofSeconds(10L));
        try {
            this.messageSource.close();
        } catch (Exception e) {
            LOG.error("Error during closing of the message source", e);
        }
        super.close();
        if (stopConsuming != null) {
            throw stopConsuming;
        }
    }
}
