package works.bosk.drivers.mongo;

import com.mongodb.MongoInterruptedException;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.OperationType;
import java.io.Closeable;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import works.bosk.Identifier;
import works.bosk.drivers.mongo.MappedDiagnosticContext;
import works.bosk.drivers.mongo.status.BsonComparator;

/* loaded from: input_file:works/bosk/drivers/mongo/ChangeReceiver.class */
class ChangeReceiver implements Closeable {
    private final String boskName;
    private final Identifier boskID;
    private final ChangeListener listener;
    private final MongoDriverSettings settings;
    private final MongoCollection<BsonDocument> collection;
    private static final AtomicLong EVENT_COUNTER = new AtomicLong(0);
    private static final Logger LOGGER = LoggerFactory.getLogger(ChangeReceiver.class);
    private final ScheduledExecutorService ex = Executors.newScheduledThreadPool(1);
    private volatile boolean isClosed = false;
    private final Exception creationPoint = new Exception("Additional context: ChangeReceiver creation stack trace:");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: works.bosk.drivers.mongo.ChangeReceiver$1, reason: invalid class name */
    /* loaded from: input_file:works/bosk/drivers/mongo/ChangeReceiver$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$mongodb$client$model$changestream$OperationType = new int[OperationType.values().length];

        static {
            try {
                $SwitchMap$com$mongodb$client$model$changestream$OperationType[OperationType.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$mongodb$client$model$changestream$OperationType[OperationType.UPDATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$mongodb$client$model$changestream$OperationType[OperationType.REPLACE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$mongodb$client$model$changestream$OperationType[OperationType.DELETE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$mongodb$client$model$changestream$OperationType[OperationType.RENAME.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$mongodb$client$model$changestream$OperationType[OperationType.DROP.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$com$mongodb$client$model$changestream$OperationType[OperationType.DROP_DATABASE.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$com$mongodb$client$model$changestream$OperationType[OperationType.INVALIDATE.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$com$mongodb$client$model$changestream$OperationType[OperationType.OTHER.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChangeReceiver(String str, Identifier identifier, ChangeListener changeListener, MongoDriverSettings mongoDriverSettings, MongoCollection<BsonDocument> mongoCollection) {
        this.boskName = str;
        this.boskID = identifier;
        this.listener = changeListener;
        this.settings = mongoDriverSettings;
        this.collection = mongoCollection;
        this.ex.scheduleWithFixedDelay(this::connectionLoop, 0L, mongoDriverSettings.recoveryPollingMS(), TimeUnit.MILLISECONDS);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.isClosed = true;
        this.ex.shutdownNow();
    }

    /* JADX WARN: Finally extract failed */
    private void connectionLoop() {
        String name = Thread.currentThread().getName();
        Thread.currentThread().setName(getClass().getSimpleName() + " [" + this.boskName + "]");
        try {
            MappedDiagnosticContext.MDCScope mDCScope = MappedDiagnosticContext.setupMDC(this.boskName, this.boskID);
            try {
                LOGGER.debug("Starting connectionLoop task");
                while (!this.isClosed) {
                    try {
                        LOGGER.debug("Opening cursor");
                        try {
                            MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> openCursor = openCursor();
                            try {
                                try {
                                    try {
                                        try {
                                            try {
                                                try {
                                                    try {
                                                        try {
                                                            try {
                                                                this.listener.onConnectionSucceeded();
                                                                eventLoop(openCursor);
                                                            } catch (Throwable th) {
                                                                if (openCursor != null) {
                                                                    try {
                                                                        openCursor.close();
                                                                    } catch (Throwable th2) {
                                                                        th.addSuppressed(th2);
                                                                    }
                                                                }
                                                                throw th;
                                                            }
                                                        } catch (TimeoutException e) {
                                                            addContextToException(e);
                                                            LOGGER.warn("Timed out waiting for bosk state to initialize; will wait and retry", e);
                                                            this.listener.onDisconnect(e);
                                                            if (openCursor != null) {
                                                                openCursor.close();
                                                            }
                                                            LOGGER.debug("Ending connectionLoop task; isClosed={}", Boolean.valueOf(this.isClosed));
                                                            Thread.currentThread().setName(name);
                                                            if (mDCScope != null) {
                                                                mDCScope.close();
                                                                return;
                                                            }
                                                            return;
                                                        }
                                                    } catch (DisconnectedException e2) {
                                                        addContextToException(e2);
                                                        LOGGER.warn("Driver is disconnected; will wait and retry", e2);
                                                        if (openCursor != null) {
                                                            openCursor.close();
                                                        }
                                                        LOGGER.debug("Ending connectionLoop task; isClosed={}", Boolean.valueOf(this.isClosed));
                                                        Thread.currentThread().setName(name);
                                                        if (mDCScope != null) {
                                                            mDCScope.close();
                                                            return;
                                                        }
                                                        return;
                                                    }
                                                } catch (UninitializedCollectionException e3) {
                                                    addContextToException(e3);
                                                    LOGGER.warn("MongoDB collection is not initialized; will wait and retry", e3);
                                                    this.listener.onDisconnect(e3);
                                                    if (openCursor != null) {
                                                        openCursor.close();
                                                    }
                                                    LOGGER.debug("Ending connectionLoop task; isClosed={}", Boolean.valueOf(this.isClosed));
                                                    Thread.currentThread().setName(name);
                                                    if (mDCScope != null) {
                                                        mDCScope.close();
                                                        return;
                                                    }
                                                    return;
                                                }
                                            } catch (Throwable th3) {
                                                if (!this.isClosed) {
                                                    throw th3;
                                                }
                                                LOGGER.debug("Cursor is already closed; skipping usual error handling");
                                                if (openCursor != null) {
                                                    openCursor.close();
                                                }
                                                LOGGER.debug("Ending connectionLoop task; isClosed={}", Boolean.valueOf(this.isClosed));
                                                Thread.currentThread().setName(name);
                                                if (mDCScope != null) {
                                                    mDCScope.close();
                                                    return;
                                                }
                                                return;
                                            }
                                        } catch (InitialRootActionException e4) {
                                            addContextToException(e4);
                                            LOGGER.warn("Unable to initialize bosk state; will wait and retry", e4);
                                            this.listener.onDisconnect(e4);
                                            if (openCursor != null) {
                                                openCursor.close();
                                            }
                                            LOGGER.debug("Ending connectionLoop task; isClosed={}", Boolean.valueOf(this.isClosed));
                                            Thread.currentThread().setName(name);
                                            if (mDCScope != null) {
                                                mDCScope.close();
                                                return;
                                            }
                                            return;
                                        }
                                    } catch (UnrecognizedFormatException e5) {
                                        addContextToException(e5);
                                        LOGGER.warn("Unrecognized MongoDB database content format; will wait and retry", e5);
                                        this.listener.onDisconnect(e5);
                                        if (openCursor != null) {
                                            openCursor.close();
                                        }
                                        LOGGER.debug("Ending connectionLoop task; isClosed={}", Boolean.valueOf(this.isClosed));
                                        Thread.currentThread().setName(name);
                                        if (mDCScope != null) {
                                            mDCScope.close();
                                            return;
                                        }
                                        return;
                                    }
                                } catch (IOException e6) {
                                    addContextToException(e6);
                                    LOGGER.warn("Unexpected exception while processing MongoDB change events; will wait and retry", e6);
                                    this.listener.onDisconnect(e6);
                                    if (openCursor != null) {
                                        openCursor.close();
                                    }
                                    LOGGER.debug("Ending connectionLoop task; isClosed={}", Boolean.valueOf(this.isClosed));
                                    Thread.currentThread().setName(name);
                                    if (mDCScope != null) {
                                        mDCScope.close();
                                        return;
                                    }
                                    return;
                                } catch (InterruptedException e7) {
                                    addContextToException(e7);
                                    LOGGER.warn("Interrupted while processing MongoDB change events; reconnecting", e7);
                                    this.listener.onDisconnect(e7);
                                    if (openCursor != null) {
                                        openCursor.close();
                                    }
                                }
                            } catch (Error | RuntimeException e8) {
                                addContextToException(e8);
                                LOGGER.warn("Unexpected exception after connecting to MongoDB; will wait and retry", e8);
                                this.listener.onDisconnect(e8);
                                if (openCursor != null) {
                                    openCursor.close();
                                }
                                LOGGER.debug("Ending connectionLoop task; isClosed={}", Boolean.valueOf(this.isClosed));
                                Thread.currentThread().setName(name);
                                if (mDCScope != null) {
                                    mDCScope.close();
                                    return;
                                }
                                return;
                            } catch (UnexpectedEventProcessingException | UnprocessableEventException e9) {
                                addContextToException(e9);
                                LOGGER.warn("Unable to process MongoDB change event; reconnecting ({})", e9.getMessage(), e9);
                                this.listener.onDisconnect(e9);
                                if (openCursor != null) {
                                    openCursor.close();
                                }
                            }
                            if (this.isClosed) {
                                LOGGER.debug("Cursor is already closed; skipping usual error handling");
                                if (openCursor != null) {
                                    openCursor.close();
                                }
                                LOGGER.debug("Ending connectionLoop task; isClosed={}", Boolean.valueOf(this.isClosed));
                                Thread.currentThread().setName(name);
                                if (mDCScope != null) {
                                    mDCScope.close();
                                    return;
                                }
                                return;
                            }
                            if (openCursor != null) {
                                openCursor.close();
                            }
                            LOGGER.trace("Change event processing returned normally");
                        } catch (RuntimeException e10) {
                            addContextToException(e10);
                            LOGGER.warn("Unable to connect to MongoDB database; will wait and retry", e10);
                            try {
                                this.listener.onConnectionFailed(e10);
                            } catch (InterruptedException | TimeoutException | InitialRootActionException e11) {
                                addContextToException(e10);
                                LOGGER.error("Error while running MongoDB connection failure handler; will wait and reconnect", e11);
                            }
                            LOGGER.debug("Ending connectionLoop task; isClosed={}", Boolean.valueOf(this.isClosed));
                            Thread.currentThread().setName(name);
                            if (mDCScope != null) {
                                mDCScope.close();
                                return;
                            }
                            return;
                        }
                    } catch (Throwable th4) {
                        LOGGER.debug("Ending connectionLoop task; isClosed={}", Boolean.valueOf(this.isClosed));
                        Thread.currentThread().setName(name);
                        throw th4;
                    }
                }
                LOGGER.debug("Ending connectionLoop task; isClosed={}", Boolean.valueOf(this.isClosed));
                Thread.currentThread().setName(name);
                if (mDCScope != null) {
                    mDCScope.close();
                }
            } finally {
            }
        } catch (RuntimeException e12) {
            addContextToException(e12);
            LOGGER.warn("connectionLoop task ended with unexpected {}; discarding", e12.getClass().getSimpleName(), e12);
        }
    }

    private void addContextToException(Throwable th) {
        th.addSuppressed(this.creationPoint);
    }

    private MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> openCursor() {
        MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> cursor = this.collection.watch().maxAwaitTime(this.settings.recoveryPollingMS(), TimeUnit.MILLISECONDS).cursor();
        LOGGER.debug("Cursor is open");
        return cursor;
    }

    private void eventLoop(MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> mongoChangeStreamCursor) throws UnprocessableEventException, UnexpectedEventProcessingException {
        try {
            if (this.isClosed) {
                LOGGER.debug("Receiver is closed");
                return;
            }
            try {
                try {
                    LOGGER.debug("Starting event loop");
                    while (!this.isClosed) {
                        try {
                            ChangeStreamDocument<BsonDocument> changeStreamDocument = (ChangeStreamDocument) mongoChangeStreamCursor.next();
                            if (!this.isClosed) {
                                processEvent(changeStreamDocument);
                            }
                        } catch (MongoInterruptedException e) {
                            LOGGER.debug("Interrupted while waiting for change event: {}", e.toString());
                        } catch (NoSuchElementException e2) {
                            LOGGER.debug("Cursor is finished");
                        }
                    }
                    LOGGER.debug("Exited event loop");
                } catch (DisconnectedException e3) {
                    LOGGER.trace("connectionLoop can handle this exception; rethrow it", e3);
                    throw e3;
                }
            } catch (RuntimeException e4) {
                addContextToException(e4);
                LOGGER.debug("Unexpected {} while processing events", e4.getClass().getSimpleName(), e4);
                throw new UnexpectedEventProcessingException(e4);
            }
        } catch (Throwable th) {
            LOGGER.debug("Exited event loop");
            throw th;
        }
    }

    private void processEvent(ChangeStreamDocument<BsonDocument> changeStreamDocument) throws UnprocessableEventException {
        if (this.settings.testing().eventDelayMS() > 0) {
            LOGGER.debug("| eventDelayMS {}ms ", Long.valueOf(this.settings.testing().eventDelayMS()));
            try {
                Thread.sleep(this.settings.testing().eventDelayMS());
            } catch (InterruptedException e) {
                LOGGER.debug("| Interrupted");
            }
        }
        try {
            MDC.put("bosk.MongoDriver.event", "e" + EVENT_COUNTER.incrementAndGet());
            switch (AnonymousClass1.$SwitchMap$com$mongodb$client$model$changestream$OperationType[changeStreamDocument.getOperationType().ordinal()]) {
                case 1:
                case 2:
                case 3:
                case BsonComparator.MAX_DIFFERENCES /* 4 */:
                    this.listener.onEvent(changeStreamDocument);
                    break;
                case 5:
                case 6:
                case 7:
                case 8:
                case 9:
                    throw new UnprocessableEventException("Disruptive event received", changeStreamDocument.getOperationType());
            }
            MDC.remove("bosk.MongoDriver.event");
        } catch (Throwable th) {
            MDC.remove("bosk.MongoDriver.event");
            throw th;
        }
    }
}
