/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.elasticsearch;

import com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.direct.core.BulkAttributeWriter;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.elasticsearch.DocumentFormatter;
import cz.o2.proxima.direct.elasticsearch.ElasticsearchAccessor;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.util.ExceptionUtils;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Consumer;
import lombok.Generated;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticsearchWriter
implements BulkAttributeWriter {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchWriter.class);
    private final ElasticsearchAccessor accessor;
    private final DocumentFormatter formatter;
    @VisibleForTesting
    BulkWriter writer;

    public ElasticsearchWriter(ElasticsearchAccessor accessor) {
        this.accessor = accessor;
        this.formatter = accessor.getDocumentFormatter();
        this.writer = this.createBulkWriter(accessor);
    }

    @VisibleForTesting
    BulkWriter createBulkWriter(ElasticsearchAccessor accessor) {
        return BulkWriter.viaBulkProcessor(accessor);
    }

    public URI getUri() {
        return this.accessor.getUri();
    }

    public void rollback() {
        ExceptionUtils.unchecked(this.writer::close);
        this.writer = this.createBulkWriter(this.accessor);
    }

    public void write(StreamElement element, long watermark, CommitCallback commitCallback) {
        if (element.isDelete()) {
            if (element.isDeleteWildcard()) {
                log.warn("Wildcard deletes not supported. Got {}", (Object)element);
            } else {
                this.addDeleteRequest(element, commitCallback);
            }
        } else {
            this.addIndexRequest(element, commitCallback);
        }
    }

    private void addDeleteRequest(StreamElement element, CommitCallback commitCallback) {
        DeleteRequest request = new DeleteRequest(this.accessor.getIndexName()).id(this.toEsKey(element));
        this.writer.add(request, commitCallback);
    }

    private void addIndexRequest(StreamElement element, CommitCallback commitCallback) {
        IndexRequest request = new IndexRequest(this.accessor.getIndexName()).id(this.toEsKey(element)).opType(DocWriteRequest.OpType.INDEX).source(this.toJson(element), XContentType.JSON);
        this.writer.add(request, commitCallback);
    }

    @VisibleForTesting
    String toEsKey(StreamElement element) {
        return this.formatter.toKey(element);
    }

    @VisibleForTesting
    String toJson(StreamElement element) {
        return this.formatter.toJson(element);
    }

    public void close() {
        try {
            this.writer.close();
        }
        catch (IOException e) {
            log.warn("Error closing writer.", e);
            throw new IllegalStateException(e);
        }
    }

    public BulkAttributeWriter.Factory<? extends BulkAttributeWriter> asFactory() {
        ElasticsearchAccessor accessor = this.accessor;
        return (BulkAttributeWriter.Factory & Serializable)repo -> new ElasticsearchWriter(accessor);
    }

    @VisibleForTesting
    static interface BulkWriter
    extends Closeable {
        public static BulkWriter viaBulkProcessor(ElasticsearchAccessor accessor) {
            ConcurrentSkipListMap<Long, CommitCallback> pendingCommits = new ConcurrentSkipListMap<Long, CommitCallback>();
            ConcurrentSkipListMap<Long, CommitCallback> confirmedCommits = new ConcurrentSkipListMap<Long, CommitCallback>();
            final BulkProcessorListener listener = new BulkProcessorListener(pendingCommits, confirmedCommits);
            final RestHighLevelClient client = accessor.getRestHighLevelClient();
            final BulkProcessor processor = BulkProcessor.builder((request, bulkListener) -> client.bulkAsync((BulkRequest)request, RequestOptions.DEFAULT, (ActionListener<BulkResponse>)bulkListener), listener, "es-writer-" + accessor.getIndexName()).setBulkActions(accessor.getBatchSize()).setConcurrentRequests(accessor.getConcurrentRequests()).setFlushInterval(accessor.getFlushInterval()).setBulkSize(new ByteSizeValue(accessor.getBulkSizeMb(), ByteSizeUnit.MB)).build();
            return new BulkWriter(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void add(IndexRequest request, CommitCallback commit) {
                    BulkProcessorListener bulkProcessorListener = listener;
                    synchronized (bulkProcessorListener) {
                        processor.add(request);
                        listener.setLastWrittenOffset(commit);
                    }
                }

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void add(DeleteRequest request, CommitCallback commit) {
                    BulkProcessorListener bulkProcessorListener = listener;
                    synchronized (bulkProcessorListener) {
                        processor.add(request);
                        listener.setLastWrittenOffset(commit);
                    }
                }

                @Override
                public void close() throws IOException {
                    processor.close();
                    client.close();
                }
            };
        }

        public void add(IndexRequest var1, CommitCallback var2);

        public void add(DeleteRequest var1, CommitCallback var2);
    }

    @VisibleForTesting
    static class BulkProcessorListener
    implements BulkProcessor.Listener {
        private final NavigableMap<Long, CommitCallback> pendingCommits;
        private final NavigableMap<Long, CommitCallback> confirmedCommits;
        CommitCallback lastWrittenOffset;

        BulkProcessorListener(NavigableMap<Long, CommitCallback> pendingCommits, NavigableMap<Long, CommitCallback> confirmedCommits) {
            this.pendingCommits = pendingCommits;
            this.confirmedCommits = confirmedCommits;
        }

        @Override
        public synchronized void beforeBulk(long executionId, BulkRequest request) {
            log.debug("Bulk starting with executionId: {}", (Object)executionId);
            this.pendingCommits.put(executionId, this.lastWrittenOffset);
        }

        @Override
        public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) {
            log.debug("Bulk with executionId: {} finished successfully ", (Object)executionId);
            this.doCommit(executionId, !bulkResponse.hasFailures(), bulkResponse.hasFailures() ? new IllegalStateException(String.format("Failures detected in bulk %s", bulkResponse)) : null);
        }

        @Override
        public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable failure) {
            log.warn(String.format("Bulk with executionId: %s finished with error", executionId), failure);
            this.doCommit(executionId, false, failure);
        }

        @VisibleForTesting
        void doCommit(long executionId, boolean succ, Throwable err) {
            CommitCallback currentCallback = (CommitCallback)this.pendingCommits.remove(executionId);
            if (currentCallback != null) {
                this.confirmedCommits.put(executionId, currentCallback);
                long uncommittedExecutionId = this.pendingCommits.isEmpty() ? Long.MAX_VALUE : (Long)this.pendingCommits.firstKey();
                new ArrayList<Map.Entry<Long, CommitCallback>>(this.confirmedCommits.headMap(uncommittedExecutionId).entrySet()).forEach((Consumer<Map.Entry<Long, CommitCallback>>)((Consumer<Map.Entry>)e -> {
                    this.confirmedCommits.remove(e.getKey());
                    ((CommitCallback)e.getValue()).commit(succ, err);
                }));
            } else {
                log.warn("Missing commit callback for execution ID {}", (Object)executionId);
            }
        }

        @Generated
        public void setLastWrittenOffset(CommitCallback lastWrittenOffset) {
            this.lastWrittenOffset = lastWrittenOffset;
        }

        @Generated
        public CommitCallback getLastWrittenOffset() {
            return this.lastWrittenOffset;
        }
    }
}

