/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.persisting.search.client;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.Backlog;
import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.ObjectUtils;
import io.fluxcapacitor.common.api.JsonType;
import io.fluxcapacitor.common.api.Request;
import io.fluxcapacitor.common.api.search.BulkUpdateDocuments;
import io.fluxcapacitor.common.api.search.CreateAuditTrail;
import io.fluxcapacitor.common.api.search.DeleteCollection;
import io.fluxcapacitor.common.api.search.DeleteDocumentById;
import io.fluxcapacitor.common.api.search.DeleteDocuments;
import io.fluxcapacitor.common.api.search.DocumentStats;
import io.fluxcapacitor.common.api.search.GetDocument;
import io.fluxcapacitor.common.api.search.GetDocumentResult;
import io.fluxcapacitor.common.api.search.GetDocumentStats;
import io.fluxcapacitor.common.api.search.GetDocumentStatsResult;
import io.fluxcapacitor.common.api.search.GetSearchHistogram;
import io.fluxcapacitor.common.api.search.GetSearchHistogramResult;
import io.fluxcapacitor.common.api.search.IndexDocuments;
import io.fluxcapacitor.common.api.search.SearchDocuments;
import io.fluxcapacitor.common.api.search.SearchDocumentsResult;
import io.fluxcapacitor.common.api.search.SearchHistogram;
import io.fluxcapacitor.common.api.search.SearchQuery;
import io.fluxcapacitor.common.api.search.SerializedDocument;
import io.fluxcapacitor.common.api.search.SerializedDocumentUpdate;
import io.fluxcapacitor.common.search.Document;
import io.fluxcapacitor.javaclient.common.websocket.AbstractWebsocketClient;
import io.fluxcapacitor.javaclient.configuration.client.WebSocketClient;
import io.fluxcapacitor.javaclient.persisting.search.SearchHit;
import io.fluxcapacitor.javaclient.persisting.search.client.SearchClient;
import java.net.URI;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.websocket.ClientEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ClientEndpoint
public class WebSocketSearchClient
extends AbstractWebsocketClient
implements SearchClient {
    private static final Logger log = LoggerFactory.getLogger(WebSocketSearchClient.class);
    private final Backlog<Document> sendBacklog = new Backlog(documents -> this.sendValues(documents, false));
    private final Backlog<Document> sendIfNotExistsBacklog = new Backlog(documents -> this.sendValues(documents, true));
    private final Backlog<Document> storeBacklog = new Backlog(documents -> this.storeValues(documents, false));
    private final Backlog<Document> storeIfNotExistsBacklog = new Backlog(documents -> this.storeValues(documents, true));
    private final Backlog<SerializedDocumentUpdate> batchBacklog = new Backlog(actions -> this.sendAndForget((JsonType)new BulkUpdateDocuments((Collection)actions, Guarantee.SENT)));

    public WebSocketSearchClient(String endPointUrl, WebSocketClient.Properties properties) {
        this(URI.create(endPointUrl), properties);
    }

    public WebSocketSearchClient(URI endpointUri, WebSocketClient.Properties properties) {
        super(endpointUri, properties, true, properties.getSearchSessions());
    }

    protected Awaitable sendValues(List<Document> documents, boolean ifNotExists) {
        return this.sendAndForget((JsonType)new IndexDocuments(documents.stream().map(SerializedDocument::new).collect(Collectors.toList()), ifNotExists, Guarantee.SENT));
    }

    protected Awaitable storeValues(List<Document> documents, boolean ifNotExists) {
        return this.send((Request)new IndexDocuments(documents.stream().map(SerializedDocument::new).collect(Collectors.toList()), ifNotExists, Guarantee.STORED))::get;
    }

    @Override
    public Awaitable index(List<Document> documents, Guarantee guarantee, boolean ifNotExists) {
        switch (guarantee) {
            case NONE: {
                Awaitable ignored = ifNotExists ? this.sendIfNotExistsBacklog.add(documents) : this.sendBacklog.add(documents);
                return Awaitable.ready();
            }
            case SENT: {
                return ifNotExists ? this.sendIfNotExistsBacklog.add(documents) : this.sendBacklog.add(documents);
            }
            case STORED: {
                return ifNotExists ? this.storeIfNotExistsBacklog.add(documents) : this.storeBacklog.add(documents);
            }
        }
        throw new UnsupportedOperationException("Unrecognized guarantee: " + guarantee);
    }

    @Override
    public Awaitable bulkUpdate(Collection<SerializedDocumentUpdate> batch, Guarantee guarantee) {
        switch (guarantee) {
            case NONE: {
                this.batchBacklog.add(batch);
                return Awaitable.ready();
            }
            case SENT: {
                return this.batchBacklog.add(batch);
            }
            case STORED: {
                CompletableFuture future = this.send((Request)new BulkUpdateDocuments(batch, guarantee));
                return future::get;
            }
        }
        throw new UnsupportedOperationException("Unrecognized guarantee: " + guarantee);
    }

    @Override
    public Stream<SearchHit<Document>> search(SearchDocuments searchDocuments) {
        AtomicInteger count = new AtomicInteger();
        Integer maxSize = searchDocuments.getMaxSize();
        int fetchBatchSize = maxSize == null ? 10000 : Math.min(maxSize, 10000);
        SearchDocuments request = searchDocuments.toBuilder().maxSize(Integer.valueOf(fetchBatchSize)).build();
        Stream<SearchHit> documentStream = ObjectUtils.iterate((Object)((SearchDocumentsResult)this.sendAndWait((Request)request)), result -> (SearchDocumentsResult)this.sendAndWait((Request)request.toBuilder().maxSize(Integer.valueOf(maxSize == null ? fetchBatchSize : Math.min(maxSize - count.get(), fetchBatchSize))).lastHit(result.lastMatch()).build()), result -> result.size() < fetchBatchSize || maxSize != null && count.addAndGet(result.size()) >= maxSize).flatMap(r -> r.getMatches().stream());
        if (maxSize != null) {
            documentStream = documentStream.limit(maxSize.intValue());
        }
        return documentStream.map(d -> new SearchHit<Document>(d.getId(), d.getCollection(), d.getTimestamp() == null ? null : Instant.ofEpochMilli(d.getTimestamp()), d.getEnd() == null ? null : Instant.ofEpochMilli(d.getEnd()), () -> ((SerializedDocument)d).deserializeDocument()));
    }

    @Override
    public Optional<Document> get(GetDocument request) {
        return Optional.ofNullable(((GetDocumentResult)this.sendAndWait((Request)request)).getDocument()).map(SerializedDocument::deserializeDocument);
    }

    @Override
    public List<DocumentStats> getStatistics(SearchQuery query, List<String> fields, List<String> groupBy) {
        GetDocumentStatsResult result = (GetDocumentStatsResult)this.sendAndWait((Request)new GetDocumentStats(query, fields, groupBy));
        return result.getDocumentStats();
    }

    @Override
    public SearchHistogram getHistogram(GetSearchHistogram request) {
        GetSearchHistogramResult result = (GetSearchHistogramResult)this.sendAndWait((Request)request);
        return result.getHistogram();
    }

    @Override
    public Awaitable delete(SearchQuery query, Guarantee guarantee) {
        DeleteDocuments request = new DeleteDocuments(query, guarantee);
        switch (guarantee) {
            case NONE: {
                this.sendAndForget((JsonType)request);
                return Awaitable.ready();
            }
            case SENT: {
                this.send((Request)request);
                return Awaitable.ready();
            }
            case STORED: {
                CompletableFuture future = this.send((Request)request);
                return future::get;
            }
        }
        throw new UnsupportedOperationException("Unrecognized guarantee: " + guarantee);
    }

    @Override
    public Awaitable delete(String documentId, String collection, Guarantee guarantee) {
        DeleteDocumentById request = new DeleteDocumentById(collection, documentId, guarantee);
        switch (guarantee) {
            case NONE: {
                this.sendAndForget((JsonType)request);
                return Awaitable.ready();
            }
            case SENT: {
                this.send((Request)request);
                return Awaitable.ready();
            }
            case STORED: {
                CompletableFuture future = this.send((Request)request);
                return future::get;
            }
        }
        throw new UnsupportedOperationException("Unrecognized guarantee: " + guarantee);
    }

    @Override
    public Awaitable deleteCollection(String collection) {
        CompletableFuture future = this.send((Request)new DeleteCollection(collection));
        return future::get;
    }

    @Override
    public Awaitable createAuditTrail(CreateAuditTrail request) {
        CompletableFuture future = this.send((Request)request);
        return future::get;
    }
}

