package io.mantisrx.extensions.dynamodb;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
import com.amazonaws.services.dynamodbv2.LockItem;
import io.mantisrx.server.core.BaseService;
import io.mantisrx.server.core.json.DefaultObjectMapper;
import io.mantisrx.server.core.master.MasterDescription;
import io.mantisrx.server.core.master.MasterMonitor;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.subjects.BehaviorSubject;

/* loaded from: input_file:io/mantisrx/extensions/dynamodb/DynamoDBMasterMonitor.class */
public class DynamoDBMasterMonitor extends BaseService implements MasterMonitor {
    private static final Logger log = LoggerFactory.getLogger(DynamoDBMasterMonitor.class);
    private static final Logger logger = LoggerFactory.getLogger(DynamoDBMasterMonitor.class);
    private static final MasterDescription MASTER_NULL = new MasterDescription("NONE", "localhost", -1, -1, -1, "uri://", -1, -1);
    private final AmazonDynamoDBLockClient lockClient;
    private final String partitionKey;
    private final Duration pollInterval;
    private final Duration gracefulShutdown;
    private final ThreadFactory monitorThreadFactory = runnable -> {
        Thread thread = new Thread(runnable);
        thread.setName("dynamodb-monitor-" + System.currentTimeMillis());
        thread.setDaemon(true);
        thread.setPriority(5);
        thread.setUncaughtExceptionHandler((thread2, th) -> {
            logger.error("thread: {} failed with {}", new Object[]{thread2.getName(), th.getMessage(), th});
        });
        return thread;
    };
    private final ScheduledExecutorService leaderMonitor = Executors.newScheduledThreadPool(1, this.monitorThreadFactory);
    private final AtomicReference<MasterDescription> latestMaster = new AtomicReference<>();
    private final ObjectMapper jsonMapper = DefaultObjectMapper.getInstance();
    private final BehaviorSubject<MasterDescription> masterSubject = BehaviorSubject.create();

    public DynamoDBMasterMonitor() {
        DynamoDBConfig dynamoDBConf = DynamoDBClientSingleton.getDynamoDBConf();
        this.pollInterval = Duration.parse(dynamoDBConf.getDynamoDBLeaderHeartbeatDuration());
        this.gracefulShutdown = Duration.parse(dynamoDBConf.getDynamoDBMonitorGracefulShutdownDuration());
        this.lockClient = DynamoDBClientSingleton.getLockClient();
        this.partitionKey = DynamoDBClientSingleton.getPartitionKey();
    }

    public DynamoDBMasterMonitor(AmazonDynamoDBLockClient amazonDynamoDBLockClient, String str, Duration duration, Duration duration2) {
        this.lockClient = amazonDynamoDBLockClient;
        this.partitionKey = str;
        this.pollInterval = duration;
        this.gracefulShutdown = duration2;
    }

    public void start() {
        this.leaderMonitor.scheduleAtFixedRate(this::getCurrentLeader, 0L, this.pollInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    public void shutdown() {
        logger.info("close the lock client");
        try {
            this.lockClient.close();
        } catch (IOException e) {
            logger.error("error closing the dynamodb lock client", e);
        }
        try {
            if (!this.leaderMonitor.awaitTermination(this.gracefulShutdown.toMillis(), TimeUnit.MILLISECONDS)) {
                this.leaderMonitor.shutdownNow();
            }
        } catch (InterruptedException e2) {
            logger.error("error timeout waiting on leader monitor to terminate executor", e2);
        }
        logger.info("leader monitor shutdown");
    }

    private void getCurrentLeader() {
        MasterDescription masterDescription;
        logger.info("attempting leader lookup");
        Optional lock = this.lockClient.getLock(this.partitionKey, Optional.empty());
        if (lock.isPresent()) {
            masterDescription = (MasterDescription) ((LockItem) lock.get()).getData().map(this::bytesToMaster).orElse(null);
        } else {
            masterDescription = null;
            logger.warn("no leader found");
        }
        updateLeader(masterDescription);
    }

    private void updateLeader(@Nullable MasterDescription masterDescription) {
        MasterDescription andSet = this.latestMaster.getAndSet(masterDescription);
        MasterDescription masterDescription2 = masterDescription == null ? MASTER_NULL : masterDescription;
        MasterDescription masterDescription3 = andSet == null ? MASTER_NULL : andSet;
        if (masterDescription3.equals(masterDescription2)) {
            return;
        }
        logger.info("leader changer information previous {} and next {}", masterDescription3.getHostname(), masterDescription2.getHostname());
        this.masterSubject.onNext(masterDescription);
    }

    private MasterDescription bytesToMaster(ByteBuffer byteBuffer) {
        if (!byteBuffer.hasRemaining()) {
            byteBuffer.rewind();
        }
        byte[] bArr = new byte[byteBuffer.remaining()];
        byteBuffer.get(bArr);
        try {
            return (MasterDescription) this.jsonMapper.readValue(bArr, MasterDescription.class);
        } catch (IOException e) {
            logger.error("unable to parse master description bytes: {}", byteBuffer, e);
            return MASTER_NULL;
        }
    }

    public Observable<MasterDescription> getMasterObservable() {
        return this.masterSubject;
    }

    @Nullable
    public MasterDescription getLatestMaster() {
        return this.latestMaster.get();
    }
}
