package cz.o2.proxima.direct.io.elasticsearch;

import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.core.util.ExceptionUtils;
import cz.o2.proxima.direct.core.BulkAttributeWriter;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.elasticsearch.shaded.org.elasticsearch.action.DocWriteRequest;
import cz.o2.proxima.elasticsearch.shaded.org.elasticsearch.action.bulk.BulkProcessor;
import cz.o2.proxima.elasticsearch.shaded.org.elasticsearch.action.bulk.BulkRequest;
import cz.o2.proxima.elasticsearch.shaded.org.elasticsearch.action.bulk.BulkResponse;
import cz.o2.proxima.elasticsearch.shaded.org.elasticsearch.action.delete.DeleteRequest;
import cz.o2.proxima.elasticsearch.shaded.org.elasticsearch.action.index.IndexRequest;
import cz.o2.proxima.elasticsearch.shaded.org.elasticsearch.client.RequestOptions;
import cz.o2.proxima.elasticsearch.shaded.org.elasticsearch.client.RestHighLevelClient;
import cz.o2.proxima.elasticsearch.shaded.org.elasticsearch.common.unit.ByteSizeUnit;
import cz.o2.proxima.elasticsearch.shaded.org.elasticsearch.common.unit.ByteSizeValue;
import cz.o2.proxima.elasticsearch.shaded.org.elasticsearch.xcontent.XContentType;
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.util.ArrayList;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.concurrent.ConcurrentSkipListMap;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/direct/io/elasticsearch/ElasticsearchWriter.class */
public class ElasticsearchWriter implements BulkAttributeWriter {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchWriter.class);
    private final ElasticsearchAccessor accessor;
    private final DocumentFormatter formatter;

    @VisibleForTesting
    BulkWriter writer;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:cz/o2/proxima/direct/io/elasticsearch/ElasticsearchWriter$BulkProcessorListener.class */
    public static class BulkProcessorListener implements BulkProcessor.Listener {
        private final NavigableMap<Long, CommitCallback> pendingCommits;
        private final NavigableMap<Long, CommitCallback> confirmedCommits;
        CommitCallback lastWrittenOffset;

        BulkProcessorListener(NavigableMap<Long, CommitCallback> navigableMap, NavigableMap<Long, CommitCallback> navigableMap2) {
            this.pendingCommits = navigableMap;
            this.confirmedCommits = navigableMap2;
        }

        @Override // cz.o2.proxima.elasticsearch.shaded.org.elasticsearch.action.bulk.BulkProcessor.Listener
        public synchronized void beforeBulk(long j, BulkRequest bulkRequest) {
            ElasticsearchWriter.log.debug("Bulk starting with executionId: {}", Long.valueOf(j));
            this.pendingCommits.put(Long.valueOf(j), this.lastWrittenOffset);
        }

        @Override // cz.o2.proxima.elasticsearch.shaded.org.elasticsearch.action.bulk.BulkProcessor.Listener
        public void afterBulk(long j, BulkRequest bulkRequest, BulkResponse bulkResponse) {
            ElasticsearchWriter.log.debug("Bulk with executionId: {} finished successfully ", Long.valueOf(j));
            doCommit(j, !bulkResponse.hasFailures(), bulkResponse.hasFailures() ? new IllegalStateException(String.format("Failures detected in bulk %s", bulkResponse)) : null);
        }

        @Override // cz.o2.proxima.elasticsearch.shaded.org.elasticsearch.action.bulk.BulkProcessor.Listener
        public void afterBulk(long j, BulkRequest bulkRequest, Throwable th) {
            ElasticsearchWriter.log.warn(String.format("Bulk with executionId: %s finished with error", Long.valueOf(j)), th);
            doCommit(j, false, th);
        }

        @VisibleForTesting
        void doCommit(long j, boolean z, Throwable th) {
            CommitCallback commitCallback = (CommitCallback) this.pendingCommits.remove(Long.valueOf(j));
            if (commitCallback == null) {
                ElasticsearchWriter.log.warn("Missing commit callback for execution ID {}", Long.valueOf(j));
            } else {
                this.confirmedCommits.put(Long.valueOf(j), commitCallback);
                new ArrayList(this.confirmedCommits.headMap(Long.valueOf(this.pendingCommits.isEmpty() ? Long.MAX_VALUE : this.pendingCommits.firstKey().longValue())).entrySet()).forEach(entry -> {
                    this.confirmedCommits.remove(entry.getKey());
                    ((CommitCallback) entry.getValue()).commit(z, th);
                });
            }
        }

        @Generated
        public void setLastWrittenOffset(CommitCallback commitCallback) {
            this.lastWrittenOffset = commitCallback;
        }

        @Generated
        public CommitCallback getLastWrittenOffset() {
            return this.lastWrittenOffset;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:cz/o2/proxima/direct/io/elasticsearch/ElasticsearchWriter$BulkWriter.class */
    public interface BulkWriter extends Closeable {
        static BulkWriter viaBulkProcessor(ElasticsearchAccessor elasticsearchAccessor) {
            final BulkProcessorListener bulkProcessorListener = new BulkProcessorListener(new ConcurrentSkipListMap(), new ConcurrentSkipListMap());
            final RestHighLevelClient restHighLevelClient = elasticsearchAccessor.getRestHighLevelClient();
            final BulkProcessor build = BulkProcessor.builder((bulkRequest, actionListener) -> {
                restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, actionListener);
            }, bulkProcessorListener, "es-writer-" + elasticsearchAccessor.getIndexName()).setBulkActions(elasticsearchAccessor.getBatchSize()).setConcurrentRequests(elasticsearchAccessor.getConcurrentRequests()).setFlushInterval(elasticsearchAccessor.getFlushInterval()).setBulkSize(new ByteSizeValue(elasticsearchAccessor.getBulkSizeMb(), ByteSizeUnit.MB)).build();
            return new BulkWriter() { // from class: cz.o2.proxima.direct.io.elasticsearch.ElasticsearchWriter.BulkWriter.1
                @Override // cz.o2.proxima.direct.io.elasticsearch.ElasticsearchWriter.BulkWriter
                public void add(IndexRequest indexRequest, CommitCallback commitCallback) {
                    synchronized (BulkProcessorListener.this) {
                        build.add(indexRequest);
                        BulkProcessorListener.this.setLastWrittenOffset(commitCallback);
                    }
                }

                @Override // cz.o2.proxima.direct.io.elasticsearch.ElasticsearchWriter.BulkWriter
                public void add(DeleteRequest deleteRequest, CommitCallback commitCallback) {
                    synchronized (BulkProcessorListener.this) {
                        build.add(deleteRequest);
                        BulkProcessorListener.this.setLastWrittenOffset(commitCallback);
                    }
                }

                @Override // java.io.Closeable, java.lang.AutoCloseable
                public void close() throws IOException {
                    build.close();
                    restHighLevelClient.close();
                }
            };
        }

        void add(IndexRequest indexRequest, CommitCallback commitCallback);

        void add(DeleteRequest deleteRequest, CommitCallback commitCallback);
    }

    public ElasticsearchWriter(ElasticsearchAccessor elasticsearchAccessor) {
        this.accessor = elasticsearchAccessor;
        this.formatter = elasticsearchAccessor.getDocumentFormatter();
        this.writer = createBulkWriter(elasticsearchAccessor);
    }

    @VisibleForTesting
    BulkWriter createBulkWriter(ElasticsearchAccessor elasticsearchAccessor) {
        return BulkWriter.viaBulkProcessor(elasticsearchAccessor);
    }

    public URI getUri() {
        return this.accessor.getUri();
    }

    public void rollback() {
        BulkWriter bulkWriter = this.writer;
        Objects.requireNonNull(bulkWriter);
        ExceptionUtils.unchecked(bulkWriter::close);
        this.writer = createBulkWriter(this.accessor);
    }

    public void write(StreamElement streamElement, long j, CommitCallback commitCallback) {
        if (!streamElement.isDelete()) {
            addIndexRequest(streamElement, commitCallback);
        } else if (streamElement.isDeleteWildcard()) {
            log.warn("Wildcard deletes not supported. Got {}", streamElement);
        } else {
            addDeleteRequest(streamElement, commitCallback);
        }
    }

    private void addDeleteRequest(StreamElement streamElement, CommitCallback commitCallback) {
        this.writer.add(new DeleteRequest(this.accessor.getIndexName()).id(toEsKey(streamElement)), commitCallback);
    }

    private void addIndexRequest(StreamElement streamElement, CommitCallback commitCallback) {
        this.writer.add(new IndexRequest(this.accessor.getIndexName()).id(toEsKey(streamElement)).opType(DocWriteRequest.OpType.INDEX).source(toJson(streamElement), XContentType.JSON), commitCallback);
    }

    @VisibleForTesting
    String toEsKey(StreamElement streamElement) {
        return this.formatter.toKey(streamElement);
    }

    @VisibleForTesting
    String toJson(StreamElement streamElement) {
        return this.formatter.toJson(streamElement);
    }

    public void close() {
        try {
            this.writer.close();
        } catch (IOException e) {
            log.warn("Error closing writer.", e);
            throw new IllegalStateException(e);
        }
    }

    /* renamed from: asFactory, reason: merged with bridge method [inline-methods] */
    public BulkAttributeWriter.Factory<? extends BulkAttributeWriter> m27asFactory() {
        ElasticsearchAccessor elasticsearchAccessor = this.accessor;
        return repository -> {
            return new ElasticsearchWriter(elasticsearchAccessor);
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 94756344:
                if (implMethodName.equals("close")) {
                    z = true;
                    break;
                }
                break;
            case 1522138028:
                if (implMethodName.equals("lambda$asFactory$349bf585$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/direct/core/BulkAttributeWriter$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/o2/proxima/direct/io/elasticsearch/ElasticsearchWriter") && serializedLambda.getImplMethodSignature().equals("(Lcz/o2/proxima/direct/io/elasticsearch/ElasticsearchAccessor;Lcz/o2/proxima/core/repository/Repository;)Lcz/o2/proxima/direct/core/BulkAttributeWriter;")) {
                    ElasticsearchAccessor elasticsearchAccessor = (ElasticsearchAccessor) serializedLambda.getCapturedArg(0);
                    return repository -> {
                        return new ElasticsearchWriter(elasticsearchAccessor);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("cz/o2/proxima/core/util/ExceptionUtils$ThrowingRunnable") && serializedLambda.getFunctionalInterfaceMethodName().equals("run") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("java/io/Closeable") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    BulkWriter bulkWriter = (BulkWriter) serializedLambda.getCapturedArg(0);
                    return bulkWriter::close;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
