package org.enodeframework.mongo;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.mongodb.MongoBulkWriteException;
import com.mongodb.MongoWriteException;
import com.mongodb.bulk.BulkWriteError;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Sorts;
import com.mongodb.client.result.InsertManyResult;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.reactivestreams.client.MongoClient;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.enodeframework.common.exception.EventStoreException;
import org.enodeframework.common.exception.IORuntimeException;
import org.enodeframework.common.function.Action2;
import org.enodeframework.common.io.IOHelper;
import org.enodeframework.common.serializing.ISerializeService;
import org.enodeframework.eventing.AggregateEventAppendResult;
import org.enodeframework.eventing.BatchAggregateEventAppendResult;
import org.enodeframework.eventing.DomainEventStream;
import org.enodeframework.eventing.EventAppendResult;
import org.enodeframework.eventing.EventAppendStatus;
import org.enodeframework.eventing.IEventSerializer;
import org.enodeframework.eventing.IEventStore;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/enodeframework/mongo/MongoEventStore.class */
public class MongoEventStore implements IEventStore {
    private static final Logger logger = LoggerFactory.getLogger(MongoEventStore.class);
    private static final Pattern PATTERN = Pattern.compile("\\{.+?commandId: \"(.+?)\" }$");
    private final int duplicateCode;
    private final String versionIndexName;
    private final String commandIndexName;
    private final MongoClient mongoClient;
    private final MongoConfiguration mongoConfiguration;
    private final IEventSerializer eventSerializer;
    private final ISerializeService serializeService;

    public MongoEventStore(MongoClient mongoClient, IEventSerializer iEventSerializer, ISerializeService iSerializeService) {
        this(mongoClient, new MongoConfiguration(), iEventSerializer, iSerializeService);
    }

    public MongoEventStore(MongoClient mongoClient, MongoConfiguration mongoConfiguration, IEventSerializer iEventSerializer, ISerializeService iSerializeService) {
        this.mongoClient = mongoClient;
        this.eventSerializer = iEventSerializer;
        this.mongoConfiguration = mongoConfiguration;
        this.duplicateCode = mongoConfiguration.getDuplicateCode();
        this.versionIndexName = mongoConfiguration.getEventTableVersionUniqueIndexName();
        this.commandIndexName = mongoConfiguration.getEventTableCommandIdUniqueIndexName();
        this.serializeService = iSerializeService;
    }

    public CompletableFuture<EventAppendResult> batchAppendAsync(List<DomainEventStream> list) {
        CompletableFuture<EventAppendResult> completableFuture = new CompletableFuture<>();
        EventAppendResult eventAppendResult = new EventAppendResult();
        if (list.size() == 0) {
            completableFuture.complete(eventAppendResult);
            return completableFuture;
        }
        Map map = (Map) list.stream().distinct().collect(Collectors.groupingBy((v0) -> {
            return v0.getAggregateRootId();
        }));
        BatchAggregateEventAppendResult batchAggregateEventAppendResult = new BatchAggregateEventAppendResult(map.keySet().size());
        for (Map.Entry entry : map.entrySet()) {
            batchAppendAggregateEventsAsync((String) entry.getKey(), (List) entry.getValue(), batchAggregateEventAppendResult, 0);
        }
        return batchAggregateEventAppendResult.taskCompletionSource;
    }

    private void batchAppendAggregateEventsAsync(String str, List<DomainEventStream> list, BatchAggregateEventAppendResult batchAggregateEventAppendResult, int i) {
        IOHelper.tryAsyncActionRecursively("BatchAppendAggregateEventsAsync", () -> {
            return batchAppendAggregateEventsAsync(str, list);
        }, aggregateEventAppendResult -> {
            batchAggregateEventAppendResult.addCompleteAggregate(str, aggregateEventAppendResult);
        }, () -> {
            return String.format("[aggregateRootId: %s, eventStreamCount: %s]", str, Integer.valueOf(list.size()));
        }, (Action2) null, i, true);
    }

    private CompletableFuture<DomainEventStream> tryFindEventByCommandIdAsync(String str, String str2, List<String> list, int i) {
        CompletableFuture<DomainEventStream> completableFuture = new CompletableFuture<>();
        IOHelper.tryAsyncActionRecursively("TryFindEventByCommandIdAsync", () -> {
            return findAsync(str, str2);
        }, domainEventStream -> {
            if (domainEventStream != null) {
                list.add(domainEventStream.getCommandId());
            }
            completableFuture.complete(domainEventStream);
        }, () -> {
            return String.format("[aggregateRootId:%s, commandId:%s]", str, str2);
        }, (Action2) null, i, true);
        return completableFuture;
    }

    private CompletableFuture<AggregateEventAppendResult> batchAppendAggregateEventsAsync(String str, List<DomainEventStream> list) {
        ArrayList newArrayList = Lists.newArrayList();
        for (DomainEventStream domainEventStream : list) {
            Document document = new Document();
            document.put("aggregateRootId", domainEventStream.getAggregateRootId());
            document.put("aggregateRootTypeName", domainEventStream.getAggregateRootTypeName());
            document.put("commandId", domainEventStream.getCommandId());
            document.put("version", Integer.valueOf(domainEventStream.getVersion()));
            document.put("gmtCreate", domainEventStream.getTimestamp());
            document.put("events", this.serializeService.serialize(this.eventSerializer.serialize(domainEventStream.events())));
            newArrayList.add(document);
        }
        return (newArrayList.size() > 1 ? batchInsertAsync(newArrayList) : insertOneByOneAsync(newArrayList)).exceptionally(th -> {
            int i = 0;
            String str2 = "";
            if (th instanceof MongoWriteException) {
                MongoWriteException mongoWriteException = (MongoWriteException) th;
                i = mongoWriteException.getCode();
                str2 = mongoWriteException.getMessage();
            }
            if (th instanceof MongoBulkWriteException) {
                MongoBulkWriteException mongoBulkWriteException = (MongoBulkWriteException) th;
                if (mongoBulkWriteException.getWriteErrors().size() >= 1) {
                    BulkWriteError bulkWriteError = (BulkWriteError) mongoBulkWriteException.getWriteErrors().get(0);
                    i = bulkWriteError.getCode();
                    str2 = bulkWriteError.getMessage();
                }
            }
            if (i == this.duplicateCode && str2.contains(this.versionIndexName)) {
                AggregateEventAppendResult aggregateEventAppendResult = new AggregateEventAppendResult();
                aggregateEventAppendResult.setEventAppendStatus(EventAppendStatus.DuplicateEvent);
                return aggregateEventAppendResult;
            }
            if (i != this.duplicateCode || !str2.contains(this.commandIndexName)) {
                logger.error("Batch append event has unknown exception.", th);
                throw new EventStoreException(th);
            }
            AggregateEventAppendResult aggregateEventAppendResult2 = new AggregateEventAppendResult();
            aggregateEventAppendResult2.setEventAppendStatus(EventAppendStatus.DuplicateCommand);
            String parseDuplicateCommandId = parseDuplicateCommandId(str2);
            if (Strings.isNullOrEmpty(parseDuplicateCommandId)) {
                return aggregateEventAppendResult2;
            }
            aggregateEventAppendResult2.setDuplicateCommandIds(Lists.newArrayList(new String[]{parseDuplicateCommandId}));
            return aggregateEventAppendResult2;
        });
    }

    public CompletableFuture<AggregateEventAppendResult> batchInsertAsync(List<Document> list) {
        final CompletableFuture<AggregateEventAppendResult> completableFuture = new CompletableFuture<>();
        this.mongoClient.getDatabase(this.mongoConfiguration.getDatabaseName()).getCollection(this.mongoConfiguration.getEventCollectionName()).insertMany(list).subscribe(new Subscriber<InsertManyResult>() { // from class: org.enodeframework.mongo.MongoEventStore.1
            public void onSubscribe(Subscription subscription) {
                subscription.request(1L);
            }

            public void onNext(InsertManyResult insertManyResult) {
                AggregateEventAppendResult aggregateEventAppendResult = new AggregateEventAppendResult();
                aggregateEventAppendResult.setEventAppendStatus(EventAppendStatus.Success);
                completableFuture.complete(aggregateEventAppendResult);
            }

            public void onError(Throwable th) {
                completableFuture.completeExceptionally(th);
            }

            public void onComplete() {
                AggregateEventAppendResult aggregateEventAppendResult = new AggregateEventAppendResult();
                aggregateEventAppendResult.setEventAppendStatus(EventAppendStatus.Success);
                completableFuture.complete(aggregateEventAppendResult);
            }
        });
        return completableFuture;
    }

    public CompletableFuture<AggregateEventAppendResult> insertOneByOneAsync(List<Document> list) {
        final CompletableFuture<AggregateEventAppendResult> completableFuture = new CompletableFuture<>();
        final CountDownLatch countDownLatch = new CountDownLatch(list.size());
        Iterator<Document> it = list.iterator();
        while (it.hasNext()) {
            this.mongoClient.getDatabase(this.mongoConfiguration.getDatabaseName()).getCollection(this.mongoConfiguration.getEventCollectionName()).insertOne(it.next()).subscribe(new Subscriber<InsertOneResult>() { // from class: org.enodeframework.mongo.MongoEventStore.2
                public void onSubscribe(Subscription subscription) {
                    subscription.request(1L);
                }

                public void onNext(InsertOneResult insertOneResult) {
                    countDownLatch.countDown();
                }

                public void onError(Throwable th) {
                    countDownLatch.countDown();
                    completableFuture.completeExceptionally(th);
                }

                public void onComplete() {
                    if (countDownLatch.getCount() == 0) {
                        AggregateEventAppendResult aggregateEventAppendResult = new AggregateEventAppendResult();
                        aggregateEventAppendResult.setEventAppendStatus(EventAppendStatus.Success);
                        completableFuture.complete(aggregateEventAppendResult);
                    }
                }
            });
            if (completableFuture.isDone()) {
                break;
            }
        }
        return completableFuture;
    }

    private String parseDuplicateCommandId(String str) {
        Matcher matcher = PATTERN.matcher(str);
        return (matcher.find() && matcher.groupCount() == 1) ? matcher.group(1) : "";
    }

    public CompletableFuture<List<DomainEventStream>> queryAggregateEventsAsync(String str, String str2, int i, int i2) {
        return IOHelper.tryIOFuncAsync(() -> {
            final CompletableFuture completableFuture = new CompletableFuture();
            this.mongoClient.getDatabase(this.mongoConfiguration.getDatabaseName()).getCollection(this.mongoConfiguration.getEventCollectionName()).find(Filters.and(new Bson[]{Filters.eq("aggregateRootId", str), Filters.gte("version", Integer.valueOf(i)), Filters.lte("version", Integer.valueOf(i2))})).sort(Sorts.ascending(new String[]{"version"})).subscribe(new Subscriber<Document>() { // from class: org.enodeframework.mongo.MongoEventStore.3
                final List streams = Lists.newArrayList();

                public void onSubscribe(Subscription subscription) {
                    subscription.request(Long.MAX_VALUE);
                }

                public void onNext(Document document) {
                    this.streams.add(new DomainEventStream(document.getString("commandId"), document.getString("aggregateRootId"), document.getString("aggregateRootTypeName"), (Date) document.get("gmtCreate", Date.class), MongoEventStore.this.eventSerializer.deserialize((Map) MongoEventStore.this.serializeService.deserialize(document.getString("events"), Map.class)), Maps.newHashMap()));
                }

                public void onError(Throwable th) {
                    completableFuture.completeExceptionally(th);
                }

                public void onComplete() {
                    this.streams.sort(Comparator.comparingInt((v0) -> {
                        return v0.getVersion();
                    }));
                    completableFuture.complete(this.streams);
                }
            });
            return completableFuture.exceptionally(th -> {
                if (!(th instanceof MongoWriteException)) {
                    logger.error("Failed to query aggregate events async, aggregateRootId: {}, aggregateRootType: {}", new Object[]{str, str2, th});
                    throw new EventStoreException(th);
                }
                logger.error(String.format("Failed to query aggregate events async, aggregateRootId: %s, aggregateRootType: %s", str, str2), (MongoWriteException) th);
                throw new IORuntimeException(th);
            });
        }, "QueryAggregateEventsAsync");
    }

    public CompletableFuture<DomainEventStream> findAsync(String str, int i) {
        return IOHelper.tryIOFuncAsync(() -> {
            final CompletableFuture completableFuture = new CompletableFuture();
            this.mongoClient.getDatabase(this.mongoConfiguration.getDatabaseName()).getCollection(this.mongoConfiguration.getEventCollectionName()).find(Filters.and(new Bson[]{Filters.eq("aggregateRootId", str), Filters.eq("version", Integer.valueOf(i))})).subscribe(new Subscriber<Document>() { // from class: org.enodeframework.mongo.MongoEventStore.4
                private DomainEventStream eventStream;

                public void onSubscribe(Subscription subscription) {
                    subscription.request(1L);
                }

                public void onNext(Document document) {
                    DomainEventStream domainEventStream = new DomainEventStream(document.getString("commandId"), document.getString("aggregateRootId"), document.getString("aggregateRootTypeName"), (Date) document.get("gmtCreate", Date.class), MongoEventStore.this.eventSerializer.deserialize((Map) MongoEventStore.this.serializeService.deserialize(document.getString("events"), Map.class)), Maps.newHashMap());
                    this.eventStream = domainEventStream;
                    completableFuture.complete(domainEventStream);
                }

                public void onError(Throwable th) {
                    completableFuture.completeExceptionally(th);
                }

                public void onComplete() {
                    completableFuture.complete(this.eventStream);
                }
            });
            return completableFuture.exceptionally(th -> {
                if (th instanceof MongoWriteException) {
                    logger.error("Find event by version has sql exception, aggregateRootId: {}, version: {}", new Object[]{str, Integer.valueOf(i), (MongoWriteException) th});
                    throw new IORuntimeException(th);
                }
                logger.error("Find event by version has unknown exception, aggregateRootId: {}, version: {}", new Object[]{str, Integer.valueOf(i), th});
                throw new EventStoreException(th);
            });
        }, "FindEventByVersionAsync");
    }

    public CompletableFuture<DomainEventStream> findAsync(String str, String str2) {
        return IOHelper.tryIOFuncAsync(() -> {
            final CompletableFuture completableFuture = new CompletableFuture();
            this.mongoClient.getDatabase(this.mongoConfiguration.getDatabaseName()).getCollection(this.mongoConfiguration.getEventCollectionName()).find(Filters.and(new Bson[]{Filters.eq("aggregateRootId", str), Filters.eq("commandId", str2)})).subscribe(new Subscriber<Document>() { // from class: org.enodeframework.mongo.MongoEventStore.5
                private DomainEventStream eventStream;

                public void onSubscribe(Subscription subscription) {
                    subscription.request(1L);
                }

                public void onNext(Document document) {
                    DomainEventStream domainEventStream = new DomainEventStream(document.getString("commandId"), document.getString("aggregateRootId"), document.getString("aggregateRootTypeName"), (Date) document.get("gmtCreate", Date.class), MongoEventStore.this.eventSerializer.deserialize((Map) MongoEventStore.this.serializeService.deserialize(document.getString("events"), Map.class)), Maps.newHashMap());
                    this.eventStream = domainEventStream;
                    completableFuture.complete(domainEventStream);
                }

                public void onError(Throwable th) {
                    completableFuture.completeExceptionally(th);
                }

                public void onComplete() {
                    completableFuture.complete(this.eventStream);
                }
            });
            return completableFuture.exceptionally(th -> {
                if (th instanceof MongoWriteException) {
                    logger.error("Find event by commandId has sql exception, aggregateRootId: {}, commandId: {}", new Object[]{str, str2, (MongoWriteException) th});
                    throw new IORuntimeException(th);
                }
                logger.error("Find event by commandId has unknown exception, aggregateRootId: {}, commandId: {}", new Object[]{str, str2, th});
                throw new EventStoreException(th);
            });
        }, "FindEventByCommandIdAsync");
    }
}
