package io.fluxcapacitor.javaclient.persisting.eventsourcing.client;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.ObjectUtils;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.eventsourcing.AppendEvents;
import io.fluxcapacitor.common.api.eventsourcing.DeleteEvents;
import io.fluxcapacitor.common.api.eventsourcing.EventBatch;
import io.fluxcapacitor.common.api.eventsourcing.GetEvents;
import io.fluxcapacitor.common.api.eventsourcing.GetEventsResult;
import io.fluxcapacitor.common.api.modeling.GetAggregateIds;
import io.fluxcapacitor.common.api.modeling.GetAggregateIdsResult;
import io.fluxcapacitor.common.api.modeling.GetRelationships;
import io.fluxcapacitor.common.api.modeling.GetRelationshipsResult;
import io.fluxcapacitor.common.api.modeling.Relationship;
import io.fluxcapacitor.common.api.modeling.RepairRelationships;
import io.fluxcapacitor.common.api.modeling.UpdateRelationships;
import io.fluxcapacitor.javaclient.common.websocket.AbstractWebsocketClient;
import io.fluxcapacitor.javaclient.configuration.client.WebSocketClient;
import io.fluxcapacitor.javaclient.persisting.eventsourcing.AggregateEventStream;
import jakarta.websocket.ClientEndpoint;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

@ClientEndpoint
/* loaded from: input_file:io/fluxcapacitor/javaclient/persisting/eventsourcing/client/WebSocketEventStoreClient.class */
public class WebSocketEventStoreClient extends AbstractWebsocketClient implements EventStoreClient {
    private final int fetchBatchSize;

    public WebSocketEventStoreClient(String str, WebSocketClient.ClientConfig clientConfig) {
        this(URI.create(str), 8192, clientConfig);
    }

    public WebSocketEventStoreClient(URI uri, int i, WebSocketClient.ClientConfig clientConfig) {
        this(uri, i, clientConfig, true);
    }

    public WebSocketEventStoreClient(URI uri, int i, WebSocketClient.ClientConfig clientConfig, boolean z) {
        super(uri, clientConfig, z, clientConfig.getEventSourcingSessions());
        this.fetchBatchSize = i;
    }

    @Override // io.fluxcapacitor.javaclient.persisting.eventsourcing.client.EventStoreClient
    public Awaitable storeEvents(String str, List<SerializedMessage> list, boolean z, Guarantee guarantee) {
        return sendCommand(new AppendEvents(List.of(new EventBatch(str, list, z)), guarantee));
    }

    @Override // io.fluxcapacitor.javaclient.persisting.eventsourcing.client.EventStoreClient
    public AggregateEventStream<SerializedMessage> getEvents(String str, long j) {
        AtomicReference atomicReference = new AtomicReference();
        Stream flatMap = ObjectUtils.iterate((GetEventsResult) sendAndWait(new GetEvents(str, Long.valueOf(j), this.fetchBatchSize)), getEventsResult -> {
            return (GetEventsResult) sendAndWait(new GetEvents(str, Long.valueOf(getEventsResult.getLastSequenceNumber()), this.fetchBatchSize));
        }, getEventsResult2 -> {
            return getEventsResult2.getEventBatch().getEvents().size() < this.fetchBatchSize;
        }).flatMap(getEventsResult3 -> {
            if (!getEventsResult3.getEventBatch().isEmpty()) {
                atomicReference.set(Long.valueOf(getEventsResult3.getLastSequenceNumber()));
            }
            return getEventsResult3.getEventBatch().getEvents().stream();
        });
        Objects.requireNonNull(atomicReference);
        return new AggregateEventStream<>(flatMap, str, atomicReference::get);
    }

    @Override // io.fluxcapacitor.javaclient.persisting.eventsourcing.client.EventStoreClient
    public Awaitable updateRelationships(UpdateRelationships updateRelationships) {
        return sendCommand(updateRelationships);
    }

    @Override // io.fluxcapacitor.javaclient.persisting.eventsourcing.client.EventStoreClient
    public Awaitable repairRelationships(RepairRelationships repairRelationships) {
        return sendCommand(repairRelationships);
    }

    @Override // io.fluxcapacitor.javaclient.persisting.eventsourcing.client.EventStoreClient
    public Map<String, String> getAggregateIds(GetAggregateIds getAggregateIds) {
        return ((GetAggregateIdsResult) sendAndWait(getAggregateIds)).getAggregateIds();
    }

    @Override // io.fluxcapacitor.javaclient.persisting.eventsourcing.client.EventStoreClient
    public List<Relationship> getRelationships(GetRelationships getRelationships) {
        return ((GetRelationshipsResult) sendAndWait(getRelationships)).getRelationships();
    }

    @Override // io.fluxcapacitor.javaclient.persisting.eventsourcing.client.EventStoreClient
    public Awaitable deleteEvents(String str, Guarantee guarantee) {
        return sendCommand(new DeleteEvents(str, guarantee));
    }
}
