package com.foilen.smalltools.mongodb.distributed;

import com.foilen.smalltools.hash.HashSha1;
import com.foilen.smalltools.mongodb.MongoDbChangeStreamWaitAnyChange;
import com.foilen.smalltools.mongodb.MongoDbManageCollectionTools;
import com.foilen.smalltools.tools.AbstractBasics;
import com.foilen.smalltools.tools.BufferBatchesTools;
import com.foilen.smalltools.tools.JsonTools;
import com.foilen.smalltools.tools.RetryTools;
import com.foilen.smalltools.tuple.Tuple2;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Sorts;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;
import org.bson.Document;

/* loaded from: input_file:com/foilen/smalltools/mongodb/distributed/MongoDbReplayableQueue.class */
public class MongoDbReplayableQueue<E> extends AbstractBasics implements BlockingQueue<E> {
    private final Class<E> entityType;
    private final MongoClient mongoClient;
    private final MongoCollection<Document> mongoCollection;
    private final long stopChangeStreamAfterNoThreadWaitedInMs;
    private MongoDbChangeStreamWaitAnyChange mongoDbChangeStreamWaitAnyChange;
    private long pointer;

    public MongoDbReplayableQueue(Class<E> cls, MongoClient mongoClient, MongoCollection<Document> mongoCollection) {
        this(cls, mongoClient, mongoCollection, 600000L, 3600L);
    }

    public MongoDbReplayableQueue(Class<E> cls, MongoClient mongoClient, MongoCollection<Document> mongoCollection, long j, long j2) {
        this.pointer = -1L;
        this.entityType = cls;
        this.mongoClient = mongoClient;
        this.mongoCollection = mongoCollection;
        this.stopChangeStreamAfterNoThreadWaitedInMs = j;
        MongoDbManageCollectionTools.addCollectionIfMissing(mongoClient, mongoCollection.getNamespace());
        MongoDbManageCollectionTools.manageIndexes(mongoCollection, Map.of("hashJsonValue_id", new Tuple2(new Document().append(MongoDbDistributedConstants.FIELD_HASH_JSON_VALUE, 1).append(MongoDbDistributedConstants.FIELD_ID, 1), new IndexOptions()), "createdAt_" + j2, new Tuple2(new Document().append(MongoDbDistributedConstants.FIELD_CREATED_AT, 1), new IndexOptions().expireAfter(Long.valueOf(j2), TimeUnit.SECONDS))));
    }

    public MongoDbReplayableQueue<E> movePointerToEnd() {
        Document document = (Document) this.mongoCollection.find().projection(new Document().append(MongoDbDistributedConstants.FIELD_ID, 1)).sort(Sorts.descending(new String[]{MongoDbDistributedConstants.FIELD_ID})).first();
        this.pointer = document == null ? -1L : document.getLong(MongoDbDistributedConstants.FIELD_ID).longValue();
        return this;
    }

    @Override // java.util.concurrent.BlockingQueue, java.util.Queue
    public boolean offer(E e) {
        if (e == null) {
            throw new NullPointerException();
        }
        RetryTools.retryBetween(3, 200, () -> {
            this.mongoClient.startSession().withTransaction(() -> {
                Document document = (Document) this.mongoCollection.find().projection(new Document().append(MongoDbDistributedConstants.FIELD_ID, 1)).sort(Sorts.descending(new String[]{MongoDbDistributedConstants.FIELD_ID})).first();
                long longValue = document == null ? 0L : document.getLong(MongoDbDistributedConstants.FIELD_ID).longValue() + 1;
                String compactPrintWithoutNulls = JsonTools.compactPrintWithoutNulls(e);
                this.mongoCollection.insertOne(new Document().append(MongoDbDistributedConstants.FIELD_ID, Long.valueOf(longValue)).append(MongoDbDistributedConstants.FIELD_JSON_VALUE, compactPrintWithoutNulls).append(MongoDbDistributedConstants.FIELD_HASH_JSON_VALUE, HashSha1.hashString(compactPrintWithoutNulls)).append(MongoDbDistributedConstants.FIELD_CREATED_AT, new Date()));
                return null;
            });
        });
        return true;
    }

    @Override // java.util.Collection
    public boolean addAll(Collection<? extends E> collection) {
        BufferBatchesTools.autoClose(10, list -> {
            RetryTools.retryBetween(3, 200, () -> {
                this.mongoClient.startSession().withTransaction(() -> {
                    Document document = (Document) this.mongoCollection.find().projection(new Document().append(MongoDbDistributedConstants.FIELD_ID, 1)).sort(Sorts.descending(new String[]{MongoDbDistributedConstants.FIELD_ID})).first();
                    long longValue = document == null ? 0L : document.getLong(MongoDbDistributedConstants.FIELD_ID).longValue() + 1;
                    ArrayList arrayList = new ArrayList();
                    for (E e : list) {
                        if (e == null) {
                            throw new NullPointerException();
                        }
                        String compactPrintWithoutNulls = JsonTools.compactPrintWithoutNulls(e);
                        new Document();
                        long j = longValue;
                        longValue = j + 1;
                        arrayList.add(arrayList.append(MongoDbDistributedConstants.FIELD_ID, Long.valueOf(j)).append(MongoDbDistributedConstants.FIELD_JSON_VALUE, compactPrintWithoutNulls).append(MongoDbDistributedConstants.FIELD_HASH_JSON_VALUE, HashSha1.hashString(compactPrintWithoutNulls)).append(MongoDbDistributedConstants.FIELD_CREATED_AT, new Date()));
                    }
                    this.mongoCollection.insertMany(arrayList);
                    return null;
                });
            });
        }, bufferBatchesTools -> {
            bufferBatchesTools.add((List) collection);
        });
        return true;
    }

    @Override // java.util.Queue
    public E peek() {
        Document document = (Document) this.mongoCollection.find(new Document().append(MongoDbDistributedConstants.FIELD_ID, new Document().append("$gt", Long.valueOf(this.pointer)))).sort(Sorts.ascending(new String[]{MongoDbDistributedConstants.FIELD_ID})).first();
        if (document == null) {
            return null;
        }
        return (E) JsonTools.readFromString(document.getString(MongoDbDistributedConstants.FIELD_JSON_VALUE), this.entityType);
    }

    @Override // java.util.concurrent.BlockingQueue, java.util.Collection
    public boolean contains(Object obj) {
        if (obj == null) {
            return false;
        }
        return this.mongoCollection.find(new Document().append(MongoDbDistributedConstants.FIELD_ID, new Document().append("$gt", Long.valueOf(this.pointer))).append(MongoDbDistributedConstants.FIELD_HASH_JSON_VALUE, HashSha1.hashString(JsonTools.compactPrintWithoutNulls(obj)))).first() != null;
    }

    @Override // java.util.Collection
    public boolean containsAll(Collection<?> collection) {
        if (collection == null) {
            throw new NullPointerException();
        }
        HashSet hashSet = new HashSet();
        collection.forEach(obj -> {
            if (obj == null) {
                throw new NullPointerException();
            }
            hashSet.add(HashSha1.hashString(JsonTools.compactPrintWithoutNulls(obj)));
        });
        return StreamSupport.stream(this.mongoCollection.find(new Document().append(MongoDbDistributedConstants.FIELD_ID, new Document().append("$gt", Long.valueOf(this.pointer))).append(MongoDbDistributedConstants.FIELD_HASH_JSON_VALUE, new Document().append("$in", hashSet))).projection(new Document().append(MongoDbDistributedConstants.FIELD_HASH_JSON_VALUE, 1)).spliterator(), false).map(document -> {
            return document.getString(MongoDbDistributedConstants.FIELD_HASH_JSON_VALUE);
        }).sorted().distinct().count() == ((long) hashSet.size());
    }

    @Override // java.util.Collection, java.lang.Iterable
    public Iterator<E> iterator() {
        throw new UnsupportedOperationException("Not implemented");
    }

    @Override // java.util.Collection
    public Object[] toArray() {
        return StreamSupport.stream(this.mongoCollection.find(new Document().append(MongoDbDistributedConstants.FIELD_ID, new Document().append("$gt", Long.valueOf(this.pointer)))).sort(Sorts.ascending(new String[]{MongoDbDistributedConstants.FIELD_ID})).projection(new Document().append(MongoDbDistributedConstants.FIELD_JSON_VALUE, 1)).spliterator(), false).map(document -> {
            return JsonTools.readFromString(document.getString(MongoDbDistributedConstants.FIELD_JSON_VALUE), this.entityType);
        }).toArray();
    }

    @Override // java.util.Queue
    public synchronized E poll() {
        Document document = (Document) this.mongoCollection.find(new Document().append(MongoDbDistributedConstants.FIELD_ID, new Document().append("$gt", Long.valueOf(this.pointer)))).first();
        if (document == null) {
            return null;
        }
        this.pointer = document.getLong(MongoDbDistributedConstants.FIELD_ID).longValue();
        return (E) JsonTools.readFromString(document.getString(MongoDbDistributedConstants.FIELD_JSON_VALUE), this.entityType);
    }

    @Override // java.util.concurrent.BlockingQueue
    public E poll(long j, TimeUnit timeUnit) throws InterruptedException {
        E poll = poll();
        if (poll != null) {
            return poll;
        }
        long currentTimeMillis = System.currentTimeMillis() + timeUnit.toMillis(j);
        if (currentTimeMillis < 0) {
            currentTimeMillis = Long.MAX_VALUE;
        }
        while (poll == null && System.currentTimeMillis() < currentTimeMillis) {
            waitForChange(currentTimeMillis - System.currentTimeMillis());
            poll = poll();
        }
        return poll;
    }

    private void waitForChange(long j) throws InterruptedException {
        synchronized (this) {
            if (this.mongoDbChangeStreamWaitAnyChange == null) {
                this.mongoDbChangeStreamWaitAnyChange = new MongoDbChangeStreamWaitAnyChange(this.mongoCollection, this.stopChangeStreamAfterNoThreadWaitedInMs, "insert", new String[0]);
            }
        }
        this.mongoDbChangeStreamWaitAnyChange.waitForChange(j);
    }

    @Override // java.util.concurrent.BlockingQueue
    public int drainTo(Collection<? super E> collection, int i) {
        throw new UnsupportedOperationException("Cannot delete elements");
    }

    @Override // java.util.Collection
    public boolean removeAll(Collection<?> collection) {
        throw new UnsupportedOperationException("Cannot delete elements");
    }

    @Override // java.util.Collection
    public boolean retainAll(Collection<?> collection) {
        throw new UnsupportedOperationException("Cannot delete elements");
    }

    @Override // java.util.Collection
    public void clear() {
        throw new UnsupportedOperationException("Cannot delete elements");
    }

    @Override // java.util.Collection
    public int size() {
        long countDocuments = this.mongoCollection.countDocuments(new Document().append(MongoDbDistributedConstants.FIELD_ID, new Document().append("$gt", Long.valueOf(this.pointer))));
        if (countDocuments > 2147483647L) {
            return Integer.MAX_VALUE;
        }
        return (int) countDocuments;
    }

    @Override // java.util.concurrent.BlockingQueue
    public boolean offer(E e, long j, TimeUnit timeUnit) {
        return offer(e);
    }

    @Override // java.util.concurrent.BlockingQueue, java.util.Queue, java.util.Collection
    public boolean add(E e) {
        return offer(e);
    }

    @Override // java.util.concurrent.BlockingQueue
    public void put(E e) {
        offer(e);
    }

    @Override // java.util.Queue
    public E remove() {
        E poll = poll();
        if (poll == null) {
            throw new NoSuchElementException();
        }
        return poll;
    }

    @Override // java.util.concurrent.BlockingQueue
    public E take() throws InterruptedException {
        return poll(Long.MAX_VALUE, TimeUnit.DAYS);
    }

    @Override // java.util.concurrent.BlockingQueue
    public int remainingCapacity() {
        return Integer.MAX_VALUE;
    }

    @Override // java.util.Queue
    public E element() {
        E peek = peek();
        if (peek == null) {
            throw new NoSuchElementException();
        }
        return peek;
    }

    @Override // java.util.concurrent.BlockingQueue, java.util.Collection
    public boolean remove(Object obj) {
        throw new UnsupportedOperationException("Cannot delete elements");
    }

    @Override // java.util.concurrent.BlockingQueue
    public int drainTo(Collection<? super E> collection) {
        throw new UnsupportedOperationException("Cannot delete elements");
    }

    @Override // java.util.Collection
    public boolean isEmpty() {
        return size() == 0;
    }

    @Override // java.util.Collection
    public <T> T[] toArray(T[] tArr) {
        throw new UnsupportedOperationException();
    }
}
