/*
 * Decompiled with CFR 0.152.
 */
package io.activej.csp.queue;

import io.activej.bytebuf.ByteBuf;
import io.activej.common.Checks;
import io.activej.common.MemSize;
import io.activej.common.tuple.Tuple2;
import io.activej.csp.file.ChannelFileReader;
import io.activej.csp.file.ChannelFileWriter;
import io.activej.csp.queue.ChannelQueue;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import io.activej.reactor.ImplicitlyReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.concurrent.Executor;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ChannelFileBuffer
extends ImplicitlyReactive
implements ChannelQueue<ByteBuf> {
    private static final Logger logger = LoggerFactory.getLogger(ChannelFileBuffer.class);
    private static final boolean CHECKS = Checks.isEnabled(ChannelFileBuffer.class);
    private final ChannelFileReader reader;
    private final ChannelFileWriter writer;
    private final Executor executor;
    private final Path path;
    @Nullable
    private SettablePromise<ByteBuf> take;
    private boolean finished = false;
    @Nullable
    private Exception exception;

    private ChannelFileBuffer(ChannelFileReader reader, ChannelFileWriter writer, Executor executor, Path path) {
        this.reader = reader;
        this.writer = writer;
        this.executor = executor;
        this.path = path;
    }

    public static Promise<ChannelFileBuffer> create(Executor executor, Path filePath) {
        return ChannelFileBuffer.create(executor, filePath, null);
    }

    public static Promise<ChannelFileBuffer> create(Executor executor, Path path, @Nullable MemSize limit) {
        return Promise.ofBlocking((Executor)executor, () -> {
            Files.createDirectories(path.getParent(), new FileAttribute[0]);
            FileChannel writerChannel = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
            FileChannel readerChannel = FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.READ);
            return new Tuple2((Object)writerChannel, (Object)readerChannel);
        }).map(tuple2 -> {
            Reactor reactor = Reactor.getCurrentReactor();
            ChannelFileWriter writer = ChannelFileWriter.create(reactor, executor, (FileChannel)tuple2.value1());
            ChannelFileReader.Builder readerBuilder = ChannelFileReader.builder(reactor, executor, (FileChannel)tuple2.value2());
            if (limit != null) {
                readerBuilder.withLimit(limit.toLong());
            }
            ChannelFileReader reader = (ChannelFileReader)readerBuilder.build();
            return new ChannelFileBuffer(reader, writer, executor, path);
        });
    }

    @Override
    public Promise<Void> put(@Nullable ByteBuf item) {
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
        }
        if (this.exception != null) {
            return Promise.ofException((Exception)this.exception);
        }
        if (item == null) {
            this.finished = true;
        }
        if (this.take == null) {
            return this.writer.accept(item);
        }
        SettablePromise<ByteBuf> promise = this.take;
        this.take = null;
        promise.set((Object)item);
        return item == null ? this.writer.accept(null) : Promise.complete();
    }

    @Override
    public Promise<ByteBuf> take() {
        SettablePromise promise;
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
        }
        if (this.exception != null) {
            return Promise.ofException((Exception)this.exception);
        }
        if (!this.isExhausted()) {
            return this.reader.get();
        }
        if (this.finished) {
            return Promise.of(null);
        }
        this.take = promise = new SettablePromise();
        return promise;
    }

    @Override
    public boolean isSaturated() {
        return false;
    }

    @Override
    public boolean isExhausted() {
        return this.reader.getPosition() >= this.writer.getPosition();
    }

    public void closeEx(Exception e) {
        Reactive.checkInReactorThread((Reactive)this);
        if (this.exception != null) {
            return;
        }
        this.exception = e;
        this.writer.closeEx(e);
        this.reader.closeEx(e);
        if (this.take != null) {
            this.take.setException(e);
            this.take = null;
        }
        this.executor.execute(() -> {
            try {
                Files.deleteIfExists(this.path);
            }
            catch (IOException io) {
                logger.error("failed to cleanup channel buffer file " + this.path, (Throwable)io);
            }
        });
    }

    @Nullable
    public Exception getException() {
        return this.exception;
    }
}

