package io.activej.crdt.storage.cluster;

import com.sun.nio.file.SensitivityWatchEventModifier;
import io.activej.async.exception.AsyncCloseException;
import io.activej.async.function.AsyncSupplier;
import io.activej.async.service.ReactiveService;
import io.activej.common.exception.MalformedDataException;
import io.activej.crdt.CrdtException;
import io.activej.crdt.storage.cluster.AbstractDiscoveryService;
import io.activej.crdt.storage.cluster.IDiscoveryService;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

/* loaded from: input_file:io/activej/crdt/storage/cluster/FileDiscoveryService.class */
public final class FileDiscoveryService extends AbstractDiscoveryService implements ReactiveService {
    private static final SettablePromise<IDiscoveryService.PartitionScheme<PartitionId>> UPDATE_CONSUMED = new SettablePromise<>();
    private final WatchService watchService;
    private final Path pathToFile;
    private final Set<PartitionSchemeSupplier> activeSuppliers;

    /* loaded from: input_file:io/activej/crdt/storage/cluster/FileDiscoveryService$Builder.class */
    public final class Builder extends AbstractDiscoveryService.Builder<Builder, FileDiscoveryService> {
        private Builder() {
            super();
        }
    }

    /* loaded from: input_file:io/activej/crdt/storage/cluster/FileDiscoveryService$PartitionSchemeSupplier.class */
    private class PartitionSchemeSupplier implements AsyncSupplier<IDiscoveryService.PartitionScheme<PartitionId>> {
        final AtomicReference<SettablePromise<IDiscoveryService.PartitionScheme<PartitionId>>> cbRef = new AtomicReference<>(FileDiscoveryService.UPDATE_CONSUMED);
        final Thread watchThread;
        static final /* synthetic */ boolean $assertionsDisabled;

        private PartitionSchemeSupplier() {
            FileDiscoveryService.this.activeSuppliers.add(this);
            this.watchThread = new Thread(this::watch);
            this.watchThread.setDaemon(true);
            this.watchThread.start();
        }

        public Promise<IDiscoveryService.PartitionScheme<PartitionId>> get() {
            SettablePromise<IDiscoveryService.PartitionScheme<PartitionId>> settablePromise = this.cbRef.get();
            if (settablePromise != FileDiscoveryService.UPDATE_CONSUMED && !settablePromise.isComplete()) {
                return Promise.ofException(new CrdtException("Previous promise has not been completed yet"));
            }
            while (this.watchThread.isAlive()) {
                if (settablePromise != FileDiscoveryService.UPDATE_CONSUMED) {
                    return this.cbRef.getAndSet(FileDiscoveryService.UPDATE_CONSUMED);
                }
                SettablePromise<IDiscoveryService.PartitionScheme<PartitionId>> settablePromise2 = new SettablePromise<>();
                if (this.cbRef.compareAndSet(FileDiscoveryService.UPDATE_CONSUMED, settablePromise2)) {
                    FileDiscoveryService.this.reactor.startExternalTask();
                    return settablePromise2;
                }
                settablePromise = this.cbRef.get();
            }
            return Promise.ofException(new CrdtException("Watch service has been closed"));
        }

        private void watch() {
            onChange();
            while (true) {
                try {
                    WatchKey poll = FileDiscoveryService.this.watchService.poll(100L, TimeUnit.MILLISECONDS);
                    if (poll != null) {
                        for (WatchEvent<?> watchEvent : poll.pollEvents()) {
                            if (FileDiscoveryService.this.pathToFile.equals(FileDiscoveryService.this.pathToFile.resolveSibling((Path) watchEvent.context()))) {
                                WatchEvent.Kind<?> kind = watchEvent.kind();
                                if (kind == StandardWatchEventKinds.ENTRY_CREATE || kind == StandardWatchEventKinds.ENTRY_MODIFY) {
                                    onChange();
                                } else if (kind == StandardWatchEventKinds.ENTRY_DELETE) {
                                    onError(new FileNotFoundException(FileDiscoveryService.this.pathToFile.toString()));
                                }
                            }
                        }
                        if (!poll.reset()) {
                            onError(new CrdtException("Watch key is no longer valid"));
                            return;
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    onError(e);
                    return;
                } catch (ClosedWatchServiceException e2) {
                    onError(e2);
                    return;
                }
            }
        }

        private void onChange() {
            try {
                RendezvousPartitionScheme<PartitionId> parseScheme = FileDiscoveryService.this.parseScheme(Files.readAllBytes(FileDiscoveryService.this.pathToFile));
                completeCb(settablePromise -> {
                    settablePromise.set(parseScheme);
                });
            } catch (IOException e) {
                onError(new CrdtException("Could not read from file", e));
            } catch (MalformedDataException e2) {
                onError(new CrdtException("Could not parse file content", e2));
            }
        }

        private void onError(Exception exc) {
            FileDiscoveryService.this.activeSuppliers.remove(this);
            completeCb(settablePromise -> {
                settablePromise.setException(exc);
            });
        }

        private void completeCb(Consumer<SettablePromise<IDiscoveryService.PartitionScheme<PartitionId>>> consumer) {
            SettablePromise<IDiscoveryService.PartitionScheme<PartitionId>> settablePromise;
            SettablePromise<IDiscoveryService.PartitionScheme<PartitionId>> settablePromise2;
            do {
                settablePromise = this.cbRef.get();
                if (settablePromise != FileDiscoveryService.UPDATE_CONSUMED && !settablePromise.isComplete()) {
                    SettablePromise<IDiscoveryService.PartitionScheme<PartitionId>> andSet = this.cbRef.getAndSet(FileDiscoveryService.UPDATE_CONSUMED);
                    if (!$assertionsDisabled && andSet.isComplete()) {
                        throw new AssertionError();
                    }
                    FileDiscoveryService.this.reactor.execute(() -> {
                        consumer.accept(andSet);
                    });
                    FileDiscoveryService.this.reactor.completeExternalTask();
                    return;
                }
                settablePromise2 = new SettablePromise<>();
                consumer.accept(settablePromise2);
            } while (!this.cbRef.compareAndSet(settablePromise, settablePromise2));
        }

        static {
            $assertionsDisabled = !FileDiscoveryService.class.desiredAssertionStatus();
        }
    }

    private FileDiscoveryService(Reactor reactor, WatchService watchService, Path path) {
        super(reactor);
        this.activeSuppliers = new HashSet();
        this.watchService = watchService;
        this.pathToFile = path;
    }

    public static FileDiscoveryService create(Reactor reactor, WatchService watchService, Path path) throws CrdtException {
        return (FileDiscoveryService) builder(reactor, watchService, path).build();
    }

    public static FileDiscoveryService create(Reactor reactor, Path path) throws CrdtException {
        return (FileDiscoveryService) builder(reactor, path).build();
    }

    public static Builder builder(Reactor reactor, WatchService watchService, Path path) throws CrdtException {
        if (!Files.exists(path, new LinkOption[0])) {
            throw new CrdtException("File does not exist: " + path);
        }
        if (Files.isDirectory(path, new LinkOption[0])) {
            throw new CrdtException("File is a directory: " + path);
        }
        return new Builder();
    }

    public static Builder builder(Reactor reactor, Path path) throws CrdtException {
        try {
            return builder(reactor, path.getFileSystem().newWatchService(), path);
        } catch (IOException e) {
            throw new CrdtException("Could not create a watch service", e);
        }
    }

    @Override // io.activej.crdt.storage.cluster.IDiscoveryService
    public AsyncSupplier<IDiscoveryService.PartitionScheme<PartitionId>> discover() {
        Reactive.checkInReactorThread(this);
        try {
            this.pathToFile.getParent().register(this.watchService, new WatchEvent.Kind[]{StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE}, SensitivityWatchEventModifier.HIGH);
            return new PartitionSchemeSupplier();
        } catch (IOException e) {
            CrdtException crdtException = new CrdtException("Could not register a path to the watch service", e);
            return () -> {
                return Promise.ofException(crdtException);
            };
        }
    }

    public Promise<?> start() {
        Reactive.checkInReactorThread(this);
        return Promise.complete();
    }

    public Promise<?> stop() {
        Reactive.checkInReactorThread(this);
        HashSet hashSet = new HashSet(this.activeSuppliers);
        this.activeSuppliers.clear();
        Exception asyncCloseException = new AsyncCloseException();
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            ((PartitionSchemeSupplier) it.next()).onError(asyncCloseException);
        }
        return Promise.complete();
    }
}
