package org.apache.distributedlog.client;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.twitter.finagle.IndividualRequestTimeoutException;
import com.twitter.util.Duration;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.LogRecordSet;
import org.apache.distributedlog.LogRecordSetBuffer;
import org.apache.distributedlog.client.speculative.DefaultSpeculativeRequestExecutionPolicy;
import org.apache.distributedlog.client.speculative.SpeculativeRequestExecutionPolicy;
import org.apache.distributedlog.client.speculative.SpeculativeRequestExecutor;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
import org.apache.distributedlog.exceptions.WriteException;
import org.apache.distributedlog.io.CompressionCodec;
import org.apache.distributedlog.protocol.util.TwitterFutureUtils;
import org.apache.distributedlog.service.DistributedLogClient;

/* loaded from: input_file:org/apache/distributedlog/client/DistributedLogMultiStreamWriter.class */
public class DistributedLogMultiStreamWriter implements Runnable {
    private final int numStreams;
    private final List<String> streams;
    private final DistributedLogClient client;
    private final int bufferSize;
    private final long requestTimeoutMs;
    private final SpeculativeRequestExecutionPolicy speculativePolicy;
    private final Ticker clockTicker;
    private final CompressionCodec.Type codec;
    private final ScheduledExecutorService scheduler;
    private final boolean ownScheduler;
    private final AtomicInteger nextStreamId;
    private LogRecordSet.Writer recordSetWriter;

    /* loaded from: input_file:org/apache/distributedlog/client/DistributedLogMultiStreamWriter$Builder.class */
    public static class Builder {
        private DistributedLogClient client;
        private List<String> streams;
        private int bufferSize;
        private long flushIntervalMicros;
        private CompressionCodec.Type codec;
        private ScheduledExecutorService executorService;
        private long requestTimeoutMs;
        private int firstSpeculativeTimeoutMs;
        private int maxSpeculativeTimeoutMs;
        private float speculativeBackoffMultiplier;
        private Ticker ticker;

        private Builder() {
            this.client = null;
            this.streams = null;
            this.bufferSize = 16384;
            this.flushIntervalMicros = 2000L;
            this.codec = CompressionCodec.Type.NONE;
            this.executorService = null;
            this.requestTimeoutMs = 500L;
            this.firstSpeculativeTimeoutMs = 50;
            this.maxSpeculativeTimeoutMs = 200;
            this.speculativeBackoffMultiplier = 2.0f;
            this.ticker = Ticker.systemTicker();
        }

        public Builder client(DistributedLogClient distributedLogClient) {
            this.client = distributedLogClient;
            return this;
        }

        public Builder streams(List<String> list) {
            this.streams = list;
            return this;
        }

        public Builder bufferSize(int i) {
            this.bufferSize = i;
            return this;
        }

        public Builder flushIntervalMs(int i) {
            this.flushIntervalMicros = TimeUnit.MILLISECONDS.toMicros(i);
            return this;
        }

        public Builder flushIntervalMicros(int i) {
            this.flushIntervalMicros = i;
            return this;
        }

        public Builder compressionCodec(CompressionCodec.Type type) {
            this.codec = type;
            return this;
        }

        public Builder scheduler(ScheduledExecutorService scheduledExecutorService) {
            this.executorService = scheduledExecutorService;
            return this;
        }

        public Builder requestTimeoutMs(long j) {
            this.requestTimeoutMs = j;
            return this;
        }

        public Builder firstSpeculativeTimeoutMs(int i) {
            this.firstSpeculativeTimeoutMs = i;
            return this;
        }

        public Builder maxSpeculativeTimeoutMs(int i) {
            this.maxSpeculativeTimeoutMs = i;
            return this;
        }

        public Builder speculativeBackoffMultiplier(float f) {
            this.speculativeBackoffMultiplier = f;
            return this;
        }

        public Builder clockTicker(Ticker ticker) {
            this.ticker = ticker;
            return this;
        }

        public DistributedLogMultiStreamWriter build() {
            Preconditions.checkArgument((null == this.streams || this.streams.isEmpty()) ? false : true, "No streams provided");
            Preconditions.checkNotNull(this.client, "No distributedlog client provided");
            Preconditions.checkNotNull(this.codec, "No compression codec provided");
            Preconditions.checkArgument(this.firstSpeculativeTimeoutMs > 0 && this.firstSpeculativeTimeoutMs <= this.maxSpeculativeTimeoutMs && this.speculativeBackoffMultiplier > 0.0f && ((long) this.maxSpeculativeTimeoutMs) < this.requestTimeoutMs, "Invalid speculative timeout settings");
            return new DistributedLogMultiStreamWriter(this.streams, this.client, Math.min(this.bufferSize, 1044480), this.flushIntervalMicros, this.requestTimeoutMs, this.firstSpeculativeTimeoutMs, this.maxSpeculativeTimeoutMs, this.speculativeBackoffMultiplier, this.codec, this.ticker, this.executorService);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/distributedlog/client/DistributedLogMultiStreamWriter$PendingWriteRequest.class */
    public class PendingWriteRequest implements FutureEventListener<DLSN>, SpeculativeRequestExecutor {
        private final LogRecordSetBuffer recordSet;
        private final Stopwatch stopwatch;
        private int nextStream;
        private AtomicBoolean complete = new AtomicBoolean(false);
        private int numTriedStreams = 0;

        PendingWriteRequest(LogRecordSetBuffer logRecordSetBuffer) {
            this.stopwatch = Stopwatch.createStarted(DistributedLogMultiStreamWriter.this.clockTicker);
            this.recordSet = logRecordSetBuffer;
            this.nextStream = Math.abs(DistributedLogMultiStreamWriter.this.nextStreamId.incrementAndGet()) % DistributedLogMultiStreamWriter.this.numStreams;
        }

        synchronized String sendNextWrite() {
            long elapsed = this.stopwatch.elapsed(TimeUnit.MILLISECONDS);
            if (elapsed > DistributedLogMultiStreamWriter.this.requestTimeoutMs || this.numTriedStreams >= DistributedLogMultiStreamWriter.this.numStreams) {
                fail(new IndividualRequestTimeoutException(Duration.fromMilliseconds(elapsed)));
                return null;
            }
            try {
                String sendWriteToStream = sendWriteToStream(this.nextStream);
                this.nextStream = (this.nextStream + 1) % DistributedLogMultiStreamWriter.this.numStreams;
                this.numTriedStreams++;
                return sendWriteToStream;
            } catch (Throwable th) {
                this.nextStream = (this.nextStream + 1) % DistributedLogMultiStreamWriter.this.numStreams;
                this.numTriedStreams++;
                throw th;
            }
        }

        synchronized String sendWriteToStream(int i) {
            String stream = DistributedLogMultiStreamWriter.this.getStream(i);
            DistributedLogMultiStreamWriter.this.client.writeRecordSet(stream, this.recordSet).addEventListener(this);
            return stream;
        }

        public void onSuccess(DLSN dlsn) {
            if (this.complete.compareAndSet(false, true)) {
                this.recordSet.completeTransmit(dlsn.getLogSegmentSequenceNo(), dlsn.getEntryId(), dlsn.getSlotId());
            }
        }

        public void onFailure(Throwable th) {
            sendNextWrite();
        }

        private void fail(Throwable th) {
            if (this.complete.compareAndSet(false, true)) {
                this.recordSet.abortTransmit(th);
            }
        }

        @Override // org.apache.distributedlog.client.speculative.SpeculativeRequestExecutor
        public Future<Boolean> issueSpeculativeRequest() {
            return Future.value(Boolean.valueOf((this.complete.get() || null == sendNextWrite()) ? false : true));
        }
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    private DistributedLogMultiStreamWriter(List<String> list, DistributedLogClient distributedLogClient, int i, long j, long j2, int i2, int i3, float f, CompressionCodec.Type type, Ticker ticker, ScheduledExecutorService scheduledExecutorService) {
        this.streams = Lists.newArrayList(list);
        this.numStreams = this.streams.size();
        this.client = distributedLogClient;
        this.bufferSize = i;
        this.requestTimeoutMs = j2;
        this.codec = type;
        this.clockTicker = ticker;
        if (null == scheduledExecutorService) {
            this.scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MultiStreamWriterFlushThread-%d").build());
            this.ownScheduler = true;
        } else {
            this.scheduler = scheduledExecutorService;
            this.ownScheduler = false;
        }
        this.speculativePolicy = new DefaultSpeculativeRequestExecutionPolicy(i2, i3, f);
        Collections.shuffle(this.streams);
        this.nextStreamId = new AtomicInteger(0);
        this.recordSetWriter = newRecordSetWriter();
        if (j > 0) {
            this.scheduler.scheduleAtFixedRate(this, j, j, TimeUnit.MICROSECONDS);
        }
    }

    String getStream(int i) {
        return this.streams.get(i);
    }

    synchronized LogRecordSet.Writer getLogRecordSetWriter() {
        return this.recordSetWriter;
    }

    private LogRecordSet.Writer newRecordSetWriter() {
        return LogRecordSet.newWriter(this.bufferSize, this.codec);
    }

    public synchronized Future<DLSN> write(ByteBuffer byteBuffer) {
        int remaining = byteBuffer.remaining();
        if (remaining > 1040384) {
            return Future.exception(new LogRecordTooLongException("Log record of size " + remaining + " written when only 1040384 is allowed"));
        }
        if (this.recordSetWriter.getNumBytes() + remaining > 1044480) {
            flush();
        }
        Promise promise = new Promise();
        try {
            this.recordSetWriter.writeRecord(byteBuffer, TwitterFutureUtils.newJFuture(promise));
            if (this.recordSetWriter.getNumBytes() >= this.bufferSize) {
                flush();
            }
            return promise;
        } catch (WriteException e) {
            this.recordSetWriter.abortTransmit(e);
            this.recordSetWriter = newRecordSetWriter();
            return Future.exception(e);
        } catch (LogRecordTooLongException e2) {
            return Future.exception(e2);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        flush();
    }

    private void flush() {
        synchronized (this) {
            if (this.recordSetWriter.getNumRecords() == 0) {
                return;
            }
            LogRecordSet.Writer writer = this.recordSetWriter;
            this.recordSetWriter = newRecordSetWriter();
            transmit(writer);
        }
    }

    private void transmit(LogRecordSet.Writer writer) {
        this.speculativePolicy.initiateSpeculativeRequest(this.scheduler, new PendingWriteRequest(writer));
    }

    public void close() {
        if (this.ownScheduler) {
            this.scheduler.shutdown();
        }
    }
}
