package org.apache.distributedlog;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/AppendOnlyStreamWriter.class */
public class AppendOnlyStreamWriter implements Closeable {
    static final Logger LOG = LoggerFactory.getLogger(AppendOnlyStreamWriter.class);
    final long[] syncPos = new long[1];
    BKAsyncLogWriter logWriter;
    long requestPos;

    /* loaded from: input_file:org/apache/distributedlog/AppendOnlyStreamWriter$WriteCompleteListener.class */
    class WriteCompleteListener implements FutureEventListener<DLSN> {
        private final long position;

        public WriteCompleteListener(long j) {
            this.position = j;
        }

        @Override // org.apache.distributedlog.common.concurrent.FutureEventListener
        public void onSuccess(DLSN dlsn) {
            synchronized (AppendOnlyStreamWriter.this.syncPos) {
                if (this.position > AppendOnlyStreamWriter.this.syncPos[0]) {
                    AppendOnlyStreamWriter.this.syncPos[0] = this.position;
                }
            }
        }

        @Override // org.apache.distributedlog.common.concurrent.FutureEventListener
        public void onFailure(Throwable th) {
        }
    }

    public AppendOnlyStreamWriter(BKAsyncLogWriter bKAsyncLogWriter, long j) {
        this.requestPos = 0L;
        LOG.debug("initialize at position {}", Long.valueOf(j));
        this.logWriter = bKAsyncLogWriter;
        this.syncPos[0] = j;
        this.requestPos = j;
    }

    public CompletableFuture<DLSN> write(byte[] bArr) {
        this.requestPos += bArr.length;
        return this.logWriter.write(new LogRecord(this.requestPos, bArr)).whenComplete((BiConsumer<? super DLSN, ? super Throwable>) new WriteCompleteListener(this.requestPos));
    }

    public void force(boolean z) throws IOException {
        try {
            long longValue = ((Long) FutureUtils.result(this.logWriter.flushAndCommit())).longValue();
            synchronized (this.syncPos) {
                this.syncPos[0] = longValue;
            }
        } catch (IOException e) {
            throw e;
        } catch (Exception e2) {
            LOG.error("unexpected exception in AppendOnlyStreamWriter.force ", e2);
            throw new UnexpectedException("unexpected exception in AppendOnlyStreamWriter.force", e2);
        }
    }

    public long position() {
        long j;
        synchronized (this.syncPos) {
            j = this.syncPos[0];
        }
        return j;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.logWriter.closeAndComplete();
    }

    public void markEndOfStream() throws IOException {
        try {
            FutureUtils.result(this.logWriter.markEndOfStream());
        } catch (IOException e) {
            throw e;
        } catch (Exception e2) {
            throw new UnexpectedException("Mark end of stream hit unexpected exception", e2);
        }
    }
}
