package org.rostore.v2.container.async;

import java.lang.AutoCloseable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.rostore.entity.Record;
import org.rostore.entity.RoStoreException;
import org.rostore.entity.StreamProcessingException;

/* loaded from: input_file:org/rostore/v2/container/async/AsyncStream.class */
public class AsyncStream<S extends AutoCloseable> implements AutoCloseable, Future<S> {
    private final S stream;
    private Exception exception;
    private AsyncStatus status = AsyncStatus.OPENED;
    private CountDownLatch countDownLatch;
    private AsyncListener asyncListener;

    public static <S extends AutoCloseable> AsyncStream<S> wrap(S s) {
        return wrap(s, null);
    }

    public static <S extends AutoCloseable> AsyncStream<S> wrap(S s, AsyncListener asyncListener) {
        return new AsyncStream<>(s, false, asyncListener);
    }

    public static <S extends AutoCloseable> AsyncStream<S> wrapBlocking(S s) {
        return wrapBlocking(s, null);
    }

    public static <S extends AutoCloseable> AsyncStream<S> wrapBlocking(S s, AsyncListener asyncListener) {
        return new AsyncStream<>(s, true, asyncListener);
    }

    public Exception getException() {
        return this.exception;
    }

    public void notifyRecord(Record record) {
        if (this.asyncListener != null) {
            this.asyncListener.record(record);
        }
    }

    public final void processFunction(AsyncFunction<S> asyncFunction) {
        start();
        try {
            try {
                asyncFunction.process(this.stream);
                this.status = AsyncStatus.SUCCESS;
                if (this.asyncListener != null) {
                    this.asyncListener.status(this.status);
                }
            } catch (Exception e) {
                this.status = AsyncStatus.ERROR;
                this.exception = e;
                StreamProcessingException streamProcessingException = new StreamProcessingException(e);
                if (this.asyncListener != null) {
                    this.asyncListener.error(e);
                    this.asyncListener.status(this.status);
                }
                throw streamProcessingException;
            }
        } finally {
            if (this.countDownLatch != null) {
                this.countDownLatch.countDown();
            }
        }
    }

    public void fail(Exception exc) {
        this.status = AsyncStatus.ERROR;
        if (this.asyncListener != null) {
            this.asyncListener.error(exc);
            this.asyncListener.status(this.status);
        }
    }

    private AsyncStream(S s, boolean z, AsyncListener asyncListener) {
        this.stream = s;
        this.asyncListener = asyncListener;
        if (z) {
            this.countDownLatch = new CountDownLatch(1);
        }
        if (asyncListener != null) {
            asyncListener.status(this.status);
        }
    }

    @Override // java.util.concurrent.Future
    public boolean cancel(boolean z) {
        this.status = AsyncStatus.CANCELED;
        if (this.asyncListener != null) {
            this.asyncListener.status(this.status);
        }
        if (this.countDownLatch == null) {
            return true;
        }
        this.countDownLatch.countDown();
        return true;
    }

    @Override // java.util.concurrent.Future
    public boolean isCancelled() {
        return this.status == AsyncStatus.CANCELED;
    }

    @Override // java.util.concurrent.Future
    public boolean isDone() {
        return this.status.isFinished();
    }

    private void start() {
        if (this.status != AsyncStatus.OPENED) {
            throw new RoStoreException("The stream is not in opened state.");
        }
        this.status = AsyncStatus.STARTED;
        if (this.asyncListener != null) {
            this.asyncListener.status(this.status);
        }
    }

    @Override // java.util.concurrent.Future
    public S get(long j, TimeUnit timeUnit) {
        if (this.countDownLatch == null) {
            throw new RoStoreException("Can't wait for a non-blocking stream");
        }
        try {
            this.countDownLatch.await(j, timeUnit);
            if (this.exception != null) {
                throw new AsyncException(this.exception);
            }
            return this.stream;
        } catch (InterruptedException e) {
            throw new RoStoreException("Interrupted while waiting for stream", e);
        }
    }

    @Override // java.util.concurrent.Future
    public S get() {
        if (this.countDownLatch == null) {
            throw new RoStoreException("Can't wait for a non-blocking stream");
        }
        try {
            this.countDownLatch.await();
            if (this.exception != null) {
                throw new AsyncException(this.exception);
            }
            return this.stream;
        } catch (InterruptedException e) {
            throw new RoStoreException("Interrupted while waiting for stream", e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.stream.close();
    }

    public void empty() {
        if (this.status != AsyncStatus.OPENED) {
            throw new RoStoreException("Try to mark the stream as empty after it has been started.");
        }
        this.status = AsyncStatus.SUCCESS;
        if (this.countDownLatch != null) {
            this.countDownLatch.countDown();
        }
        if (this.asyncListener != null) {
            this.asyncListener.status(this.status);
        }
    }
}
