package com.foilen.smalltools.mongodb;

import com.foilen.smalltools.DelayedEvent;
import com.foilen.smalltools.mongodb.distributed.MongoDbDistributedConstants;
import com.foilen.smalltools.tools.AbstractBasics;
import com.foilen.smalltools.tools.SecureRandomTools;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.changestream.FullDocument;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.bson.BsonString;
import org.bson.Document;

/* loaded from: input_file:com/foilen/smalltools/mongodb/MongoDbChangeStreamWaitAnyChange.class */
public class MongoDbChangeStreamWaitAnyChange extends AbstractBasics {
    private final MongoCollection<Document> mongoCollection;
    private final long stopAfterNoThreadWaitedInMs;
    private long stopAfter;
    private Thread thread;
    private ChangeStreamIterable<Document> changeStream;
    private final Semaphore semaphore = new Semaphore(0);
    private final ConcurrentMap<Object, Semaphore> semaphoreById = new ConcurrentHashMap();
    private final List<String> changeTypes = new ArrayList();

    public MongoDbChangeStreamWaitAnyChange(MongoCollection<Document> mongoCollection, long j, String str, String... strArr) {
        this.mongoCollection = mongoCollection;
        this.stopAfterNoThreadWaitedInMs = j;
        this.changeTypes.add(str);
        this.changeTypes.addAll(Arrays.asList(strArr));
    }

    public void waitForChange(long j) throws InterruptedException {
        this.stopAfter = System.currentTimeMillis() + this.stopAfterNoThreadWaitedInMs;
        startIfNeeded();
        this.logger.debug("Waiting for change");
        this.semaphore.tryAcquire(j, TimeUnit.MILLISECONDS);
        this.stopAfter = System.currentTimeMillis() + this.stopAfterNoThreadWaitedInMs;
    }

    public void waitForChange(String str, long j) throws InterruptedException {
        this.stopAfter = System.currentTimeMillis() + this.stopAfterNoThreadWaitedInMs;
        this.semaphoreById.computeIfAbsent(str, obj -> {
            return new Semaphore(0);
        });
        startIfNeeded();
        this.logger.debug("Waiting for change for {}", str);
        this.semaphoreById.computeIfAbsent(str, obj2 -> {
            return new Semaphore(0);
        }).tryAcquire(j, TimeUnit.MILLISECONDS);
        this.stopAfter = System.currentTimeMillis() + this.stopAfterNoThreadWaitedInMs;
    }

    private void startIfNeeded() {
        synchronized (this) {
            if (this.thread == null) {
                this.thread = new Thread(() -> {
                    try {
                        try {
                            this.logger.info("Starting change stream");
                            this.changeStream = this.mongoCollection.watch(List.of(Aggregates.match(new Document("operationType", new Document("$in", this.changeTypes))), Aggregates.project(new Document("fullDocument", 0)))).fullDocument(FullDocument.DEFAULT);
                            this.semaphore.release(this.semaphore.getQueueLength());
                            this.semaphoreById.values().forEach(semaphore -> {
                                semaphore.release(semaphore.getQueueLength());
                            });
                            this.changeStream.forEach(changeStreamDocument -> {
                                String str = changeStreamDocument.getDocumentKey().get(MongoDbDistributedConstants.FIELD_ID);
                                if (str instanceof BsonString) {
                                    str = ((BsonString) str).getValue();
                                }
                                this.logger.debug("Change {} for {} - {}", new Object[]{changeStreamDocument.getOperationTypeString(), this.mongoCollection.getNamespace(), str});
                                Semaphore semaphore2 = this.semaphoreById.get(str);
                                if (semaphore2 != null) {
                                    semaphore2.release();
                                }
                                this.semaphore.release();
                            });
                            synchronized (this) {
                                this.logger.info("Change stream stopped");
                                this.thread = null;
                            }
                        } catch (Exception e) {
                            this.logger.error("Problem with change stream", e);
                            synchronized (this) {
                                this.logger.info("Change stream stopped");
                                this.thread = null;
                            }
                        }
                    } catch (Throwable th) {
                        synchronized (this) {
                            this.logger.info("Change stream stopped");
                            this.thread = null;
                            throw th;
                        }
                    }
                }, "Change stream for " + this.mongoCollection.getNamespace() + "-" + SecureRandomTools.randomHexString(5));
                this.thread.start();
                checkLaterIfStop();
            }
        }
    }

    private void checkLaterIfStop() {
        long currentTimeMillis = this.stopAfter - System.currentTimeMillis();
        if (currentTimeMillis <= 0) {
            currentTimeMillis = 100;
        }
        new DelayedEvent(currentTimeMillis, () -> {
            if (System.currentTimeMillis() > this.stopAfter) {
                this.semaphoreById.entrySet().removeIf(entry -> {
                    return ((Semaphore) entry.getValue()).hasQueuedThreads();
                });
                if (!this.semaphore.hasQueuedThreads() && this.semaphoreById.isEmpty()) {
                    this.logger.info("Stopping change stream");
                    this.changeStream.cursor().close();
                    return;
                } else {
                    this.logger.info("Some threads are waiting. Will wait more");
                    this.stopAfter = System.currentTimeMillis() + this.stopAfterNoThreadWaitedInMs;
                }
            }
            checkLaterIfStop();
        });
    }
}
