package io.fluxcapacitor.javaclient.eventsourcing.websocket;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.Backlog;
import io.fluxcapacitor.common.ObjectUtils;
import io.fluxcapacitor.common.api.Message;
import io.fluxcapacitor.common.api.eventsourcing.AppendEvents;
import io.fluxcapacitor.common.api.eventsourcing.EventBatch;
import io.fluxcapacitor.common.api.eventsourcing.GetEvents;
import io.fluxcapacitor.common.serialization.websocket.JsonDecoder;
import io.fluxcapacitor.common.serialization.websocket.JsonEncoder;
import io.fluxcapacitor.javaclient.common.connection.AbstractWebsocketService;
import io.fluxcapacitor.javaclient.common.serialization.Serializer;
import io.fluxcapacitor.javaclient.common.serialization.jackson.JacksonSerializer;
import io.fluxcapacitor.javaclient.eventsourcing.EventStore;
import io.fluxcapacitor.javaclient.eventsourcing.Snapshot;
import io.fluxcapacitor.javaclient.keyvalue.KeyValueRepository;
import io.fluxcapacitor.javaclient.keyvalue.KeyValueService;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import javax.websocket.ClientEndpoint;
import javax.websocket.EncodeException;

@ClientEndpoint(encoders = {JsonEncoder.class}, decoders = {JsonDecoder.class})
/* loaded from: input_file:io/fluxcapacitor/javaclient/eventsourcing/websocket/WebSocketEventStore.class */
public class WebSocketEventStore extends AbstractWebsocketService implements EventStore {
    private final KeyValueRepository<Snapshot> snapshotRepository;
    private final Backlog<EventBatch> backlog;
    private final int fetchBatchSize;

    public WebSocketEventStore(String str, KeyValueService keyValueService) {
        this(URI.create(str), 1024, 1024, keyValueService, new JacksonSerializer());
    }

    public WebSocketEventStore(String str, int i, KeyValueService keyValueService) {
        this(URI.create(str), i, 1024, keyValueService, new JacksonSerializer());
    }

    public WebSocketEventStore(URI uri, int i, int i2, KeyValueService keyValueService, Serializer serializer) {
        super(uri);
        this.backlog = new Backlog<>(this::doSend, i);
        this.fetchBatchSize = i2;
        this.snapshotRepository = new KeyValueRepository<>(keyValueService, serializer);
    }

    @Override // io.fluxcapacitor.javaclient.eventsourcing.EventStore
    public Awaitable storeEvents(String str, String str2, long j, List<Message> list) {
        return this.backlog.add(new EventBatch[]{new EventBatch(str, str2, j, list)});
    }

    private Awaitable doSend(List<EventBatch> list) throws IOException, EncodeException {
        getSession().getBasicRemote().sendObject(new AppendEvents(list));
        return Awaitable.ready();
    }

    @Override // io.fluxcapacitor.javaclient.eventsourcing.EventStore
    public Stream<Message> getEvents(String str, long j) {
        return ObjectUtils.iterate(sendRequest(new GetEvents(str, Long.valueOf(j), this.fetchBatchSize)), getEventsResult -> {
            return sendRequest(new GetEvents(str, Long.valueOf(getEventsResult.getEventBatch().getLastSequenceNumber()), this.fetchBatchSize));
        }, getEventsResult2 -> {
            return getEventsResult2.getEventBatch().getEvents().size() < this.fetchBatchSize;
        }).flatMap(getEventsResult3 -> {
            return getEventsResult3.getEventBatch().getEvents().stream();
        });
    }

    @Override // io.fluxcapacitor.javaclient.eventsourcing.EventStore
    public void storeSnapshot(Snapshot snapshot) {
        this.snapshotRepository.put(snapshot.getAggregateId(), snapshot);
    }

    @Override // io.fluxcapacitor.javaclient.eventsourcing.EventStore
    public Optional<Snapshot> getSnapshot(String str) {
        return Optional.ofNullable(this.snapshotRepository.get(str));
    }

    @Override // io.fluxcapacitor.javaclient.eventsourcing.EventStore
    public void deleteSnapshot(String str) {
        this.snapshotRepository.delete(str);
    }
}
