/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.indexer.messages;

import com.github.joschi.jadconfig.util.Duration;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.WaitStrategies;
import com.github.rholder.retry.WaitStrategy;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.graylog2.indexer.IndexFailure;
import org.graylog2.indexer.IndexFailureImpl;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.messages.AutoValue_Messages_IndexingError;
import org.graylog2.indexer.messages.DocumentNotFoundException;
import org.graylog2.indexer.messages.IndexBlockRetryAttempt;
import org.graylog2.indexer.messages.Indexable;
import org.graylog2.indexer.messages.IndexingRequest;
import org.graylog2.indexer.messages.MessagesAdapter;
import org.graylog2.indexer.messages.TrafficAccounting;
import org.graylog2.indexer.results.ResultMessage;
import org.graylog2.plugin.Message;
import org.graylog2.system.processing.ProcessingStatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class Messages {
    private static final Logger LOG = LoggerFactory.getLogger(Messages.class);
    private static final Duration MAX_WAIT_TIME = Duration.seconds((long)30L);
    private static final int retrySecondsMultiplier = 500;
    @VisibleForTesting
    static final WaitStrategy exponentialWaitSeconds = WaitStrategies.exponentialWait((long)500L, (long)MAX_WAIT_TIME.getQuantity(), (TimeUnit)MAX_WAIT_TIME.getUnit());
    @VisibleForTesting
    static final WaitStrategy exponentialWaitMilliseconds = WaitStrategies.exponentialWait((long)MAX_WAIT_TIME.getQuantity(), (TimeUnit)MAX_WAIT_TIME.getUnit());
    private static final Retryer<List<IndexingError>> BULK_REQUEST_RETRYER = RetryerBuilder.newBuilder().retryIfException(t -> t instanceof IOException).withWaitStrategy(WaitStrategies.exponentialWait((long)MAX_WAIT_TIME.getQuantity(), (TimeUnit)MAX_WAIT_TIME.getUnit())).withRetryListener(new RetryListener(){

        public <V> void onRetry(Attempt<V> attempt) {
            if (attempt.hasException()) {
                LOG.error("Caught exception during bulk indexing: {}, retrying (attempt #{}).", (Object)attempt.getExceptionCause(), (Object)attempt.getAttemptNumber());
            } else if (attempt.getAttemptNumber() > 1L) {
                LOG.info("Bulk indexing finally successful (attempt #{}).", (Object)attempt.getAttemptNumber());
            }
        }
    }).build();
    private final LinkedBlockingQueue<List<IndexFailure>> indexFailureQueue;
    private final MessagesAdapter messagesAdapter;
    private final ProcessingStatusRecorder processingStatusRecorder;
    private final TrafficAccounting trafficAccounting;

    @Inject
    public Messages(TrafficAccounting trafficAccounting, MessagesAdapter messagesAdapter, ProcessingStatusRecorder processingStatusRecorder) {
        this.trafficAccounting = trafficAccounting;
        this.messagesAdapter = messagesAdapter;
        this.processingStatusRecorder = processingStatusRecorder;
        this.indexFailureQueue = new LinkedBlockingQueue(1000);
    }

    public ResultMessage get(String messageId, String index) throws DocumentNotFoundException, IOException {
        return this.messagesAdapter.get(messageId, index);
    }

    public List<String> analyze(String toAnalyze, String index, String analyzer) throws IOException {
        return this.messagesAdapter.analyze(toAnalyze, index, analyzer);
    }

    public List<String> bulkIndex(List<Map.Entry<IndexSet, Message>> messageList) {
        return this.bulkIndex(messageList, false);
    }

    public List<String> bulkIndex(List<Map.Entry<IndexSet, Message>> messageList, boolean isSystemTraffic) {
        if (messageList.isEmpty()) {
            return Collections.emptyList();
        }
        List<IndexingRequest> indexingRequestList = messageList.stream().map(entry -> IndexingRequest.create((IndexSet)entry.getKey(), (Indexable)entry.getValue())).collect(Collectors.toList());
        return this.bulkIndexRequests(indexingRequestList, isSystemTraffic);
    }

    public List<String> bulkIndexRequests(List<IndexingRequest> indexingRequestList, boolean isSystemTraffic) {
        List<IndexingError> indexingErrors = this.runBulkRequest(indexingRequestList, indexingRequestList.size());
        Set<IndexingError> remainingErrors = this.retryOnlyIndexBlockItemsForever(indexingRequestList, indexingErrors);
        Set failedIds = remainingErrors.stream().map(indexingError -> indexingError.message().getId()).collect(Collectors.toSet());
        List<IndexingRequest> successfulRequests = indexingRequestList.stream().filter(indexingRequest -> !failedIds.contains(indexingRequest.message().getId())).collect(Collectors.toList());
        this.recordTimestamp(successfulRequests);
        this.accountTotalMessageSizes(indexingRequestList, isSystemTraffic);
        return this.propagateFailure(remainingErrors);
    }

    private Set<IndexingError> retryOnlyIndexBlockItemsForever(List<IndexingRequest> messages, List<IndexingError> allFailedItems) {
        Set<IndexingError> indexBlocks = this.indexBlocksFrom(allFailedItems);
        HashSet<IndexingError> otherFailures = new HashSet<IndexingError>((Collection<IndexingError>)Sets.difference(new HashSet<IndexingError>(allFailedItems), indexBlocks));
        List<IndexingRequest> blockedMessages = this.messagesForResultItems(messages, indexBlocks);
        if (!indexBlocks.isEmpty()) {
            LOG.warn("Retrying {} messages, because their indices are blocked with status [read-only / allow delete]", (Object)indexBlocks.size());
        }
        long attempt = 1L;
        while (!indexBlocks.isEmpty()) {
            this.waitBeforeRetrying(attempt++);
            List<IndexingError> failedItems = this.runBulkRequest(blockedMessages, messages.size());
            indexBlocks = this.indexBlocksFrom(failedItems);
            blockedMessages = this.messagesForResultItems(blockedMessages, indexBlocks);
            Sets.SetView newOtherFailures = Sets.difference(new HashSet<IndexingError>(failedItems), indexBlocks);
            otherFailures.addAll((Collection<IndexingError>)newOtherFailures);
            if (!indexBlocks.isEmpty()) continue;
            LOG.info("Retries were successful after {} attempts. Ingestion will continue now.", (Object)attempt);
        }
        return otherFailures;
    }

    private List<IndexingRequest> messagesForResultItems(List<IndexingRequest> chunk, Set<IndexingError> indexBlocks) {
        Set blockedMessageIds = indexBlocks.stream().map(item -> item.message().getId()).collect(Collectors.toSet());
        return chunk.stream().filter(entry -> blockedMessageIds.contains(entry.message().getId())).collect(Collectors.toList());
    }

    private Set<IndexingError> indexBlocksFrom(List<IndexingError> allFailedItems) {
        return allFailedItems.stream().filter(this::hasFailedDueToBlockedIndex).collect(Collectors.toSet());
    }

    private boolean hasFailedDueToBlockedIndex(IndexingError indexingError) {
        return indexingError.errorType().equals((Object)IndexingError.ErrorType.IndexBlocked);
    }

    private void waitBeforeRetrying(long attempt) {
        try {
            long sleepTime = exponentialWaitSeconds.computeSleepTime((Attempt)new IndexBlockRetryAttempt(attempt));
            Thread.sleep(sleepTime);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private List<IndexingError> runBulkRequest(List<IndexingRequest> indexingRequestList, int count) {
        try {
            return (List)BULK_REQUEST_RETRYER.call(() -> this.messagesAdapter.bulkIndex(indexingRequestList));
        }
        catch (RetryException | ExecutionException e) {
            if (e instanceof RetryException) {
                LOG.error("Could not bulk index {} messages. Giving up after {} attempts.", (Object)count, (Object)((RetryException)e).getNumberOfFailedAttempts());
            } else {
                LOG.error("Couldn't bulk index " + count + " messages.", e);
            }
            throw new RuntimeException(e);
        }
    }

    private void accountTotalMessageSizes(List<IndexingRequest> requests, boolean isSystemTraffic) {
        long totalSizeOfIndexedMessages = requests.stream().map(IndexingRequest::message).mapToLong(Indexable::getSize).sum();
        if (isSystemTraffic) {
            this.trafficAccounting.addSystemTraffic(totalSizeOfIndexedMessages);
        } else {
            this.trafficAccounting.addOutputTraffic(totalSizeOfIndexedMessages);
        }
    }

    private void recordTimestamp(List<IndexingRequest> messageList) {
        for (IndexingRequest entry : messageList) {
            Indexable message = entry.message();
            this.processingStatusRecorder.updatePostIndexingReceiveTime(message.getReceiveTime());
        }
    }

    private List<String> propagateFailure(Collection<IndexingError> indexingErrors) {
        if (indexingErrors.isEmpty()) {
            return Collections.emptyList();
        }
        List indexFailures = indexingErrors.stream().map(IndexingError::toIndexFailure).collect(Collectors.toList());
        try {
            this.indexFailureQueue.offer(indexFailures, 25L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            LOG.warn("Couldn't save index failures.", (Throwable)e);
        }
        return indexFailures.stream().map(IndexFailure::letterId).collect(Collectors.toList());
    }

    public LinkedBlockingQueue<List<IndexFailure>> getIndexFailureQueue() {
        return this.indexFailureQueue;
    }

    @AutoValue
    public static abstract class IndexingError {
        public abstract Indexable message();

        public abstract String index();

        public abstract ErrorType errorType();

        public abstract String errorMessage();

        public static IndexingError create(Indexable message, String index, ErrorType errorType, String errorMessage) {
            return new AutoValue_Messages_IndexingError(message, index, errorType, errorMessage);
        }

        public static IndexingError create(Indexable message, String index) {
            return IndexingError.create(message, index, ErrorType.Unknown, "");
        }

        public IndexFailure toIndexFailure() {
            Indexable message = this.message();
            ImmutableMap doc = ImmutableMap.builder().put((Object)"letter_id", (Object)message.getId()).put((Object)"index", (Object)this.index()).put((Object)"type", (Object)this.errorType().toString()).put((Object)"message", (Object)this.errorMessage()).put((Object)"timestamp", (Object)message.getTimestamp()).build();
            return new IndexFailureImpl((Map<String, Object>)doc);
        }

        public static enum ErrorType {
            IndexBlocked,
            MappingError,
            Unknown;

        }
    }
}

