package org.apache.distributedlog.benchmark;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.benchmark.utils.ShiftableRateLimiter;
import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.common.util.SchedulerUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/benchmark/DLWriterWorker.class */
public class DLWriterWorker implements Worker {
    private static final Logger LOG = LoggerFactory.getLogger(DLWriterWorker.class);
    static final int BACKOFF_MS = 200;
    final String streamPrefix;
    final int startStreamId;
    final int endStreamId;
    final int writeConcurrency;
    final int messageSizeBytes;
    final ExecutorService executorService;
    final ScheduledExecutorService rescueService;
    final ShiftableRateLimiter rateLimiter;
    final Random random;
    final Namespace namespace;
    final List<DistributedLogManager> dlms;
    final List<AsyncLogWriter> streamWriters;
    final int numStreams;
    volatile boolean running = true;
    final StatsLogger statsLogger;
    final OpStatsLogger requestStat;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/distributedlog/benchmark/DLWriterWorker$Writer.class */
    public class Writer implements Runnable {
        final int idx;

        Writer(int i) {
            this.idx = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            DLWriterWorker.LOG.info("Started writer {}.", Integer.valueOf(this.idx));
            while (DLWriterWorker.this.running) {
                final int nextInt = DLWriterWorker.this.random.nextInt(DLWriterWorker.this.numStreams);
                final AsyncLogWriter asyncLogWriter = DLWriterWorker.this.streamWriters.get(nextInt);
                DLWriterWorker.this.rateLimiter.getLimiter().acquire();
                final long currentTimeMillis = System.currentTimeMillis();
                try {
                    asyncLogWriter.write(new LogRecord(currentTimeMillis, Utils.generateMessage(currentTimeMillis, DLWriterWorker.this.messageSizeBytes))).whenComplete((BiConsumer) new FutureEventListener<DLSN>() { // from class: org.apache.distributedlog.benchmark.DLWriterWorker.Writer.1
                        public void onSuccess(DLSN dlsn) {
                            DLWriterWorker.this.requestStat.registerSuccessfulEvent(System.currentTimeMillis() - currentTimeMillis, TimeUnit.MILLISECONDS);
                        }

                        public void onFailure(Throwable th) {
                            DLWriterWorker.this.requestStat.registerFailedEvent(System.currentTimeMillis() - currentTimeMillis, TimeUnit.MILLISECONDS);
                            DLWriterWorker.LOG.error("Failed to publish, rescue it : ", th);
                            DLWriterWorker.this.scheduleRescue(nextInt, asyncLogWriter, 0);
                        }
                    });
                } catch (TException e) {
                    DLWriterWorker.LOG.error("Error on generating message : ", e);
                    return;
                }
            }
        }
    }

    public DLWriterWorker(DistributedLogConfiguration distributedLogConfiguration, URI uri, String str, int i, int i2, ShiftableRateLimiter shiftableRateLimiter, int i3, int i4, StatsLogger statsLogger) throws IOException {
        Preconditions.checkArgument(i <= i2);
        this.streamPrefix = str;
        this.startStreamId = i;
        this.endStreamId = i2;
        this.rateLimiter = shiftableRateLimiter;
        this.writeConcurrency = i3;
        this.messageSizeBytes = i4;
        this.statsLogger = statsLogger;
        this.requestStat = this.statsLogger.getOpStatsLogger("requests");
        this.executorService = Executors.newCachedThreadPool();
        this.rescueService = Executors.newSingleThreadScheduledExecutor();
        this.random = new Random(System.currentTimeMillis());
        this.namespace = NamespaceBuilder.newBuilder().conf(distributedLogConfiguration).uri(uri).statsLogger(statsLogger.scope("dl")).build();
        this.numStreams = i2 - i;
        this.dlms = new ArrayList(this.numStreams);
        this.streamWriters = new ArrayList(this.numStreams);
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        final CountDownLatch countDownLatch = new CountDownLatch(this.numStreams);
        for (int i5 = i; i5 < i2; i5++) {
            final String format = String.format("%s_%d", str, Integer.valueOf(i5));
            final DistributedLogManager openLog = this.namespace.openLog(format);
            this.executorService.submit(new Runnable() { // from class: org.apache.distributedlog.benchmark.DLWriterWorker.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        AsyncLogWriter startAsyncLogSegmentNonPartitioned = openLog.startAsyncLogSegmentNonPartitioned();
                        if (null != concurrentHashMap.putIfAbsent(format, startAsyncLogSegmentNonPartitioned)) {
                            FutureUtils.result(startAsyncLogSegmentNonPartitioned.asyncClose());
                        }
                        countDownLatch.countDown();
                    } catch (Exception e) {
                        DLWriterWorker.LOG.error("Failed to intialize writer for stream : {}", format, e);
                    }
                }
            });
            this.dlms.add(openLog);
        }
        try {
            countDownLatch.await();
            for (int i6 = i; i6 < i2; i6++) {
                String format2 = String.format("%s_%d", str, Integer.valueOf(i6));
                AsyncLogWriter asyncLogWriter = (AsyncLogWriter) concurrentHashMap.get(format2);
                if (null == asyncLogWriter) {
                    throw new IOException("Writer for " + format2 + " never initialized.");
                }
                this.streamWriters.add(asyncLogWriter);
            }
            LOG.info("Writing to {} streams.", Integer.valueOf(this.numStreams));
        } catch (InterruptedException e) {
            throw new IOException("Interrupted on initializing writers for streams.", e);
        }
    }

    void rescueWriter(int i, AsyncLogWriter asyncLogWriter) {
        if (this.streamWriters.get(i) != asyncLogWriter) {
            LOG.warn("AsyncLogWriter for stream {} was already rescued.", Integer.valueOf(i));
            return;
        }
        try {
            FutureUtils.result(asyncLogWriter.asyncClose());
        } catch (Exception e) {
            LOG.error("Failed to close writer for stream {}.", Integer.valueOf(i));
        }
        AsyncLogWriter asyncLogWriter2 = null;
        try {
            asyncLogWriter2 = this.dlms.get(i).startAsyncLogSegmentNonPartitioned();
        } catch (IOException e2) {
            LOG.error("Failed to create new writer for stream {}, backoff for {} ms.", Integer.valueOf(i), Integer.valueOf(BACKOFF_MS));
            scheduleRescue(i, asyncLogWriter, BACKOFF_MS);
        }
        this.streamWriters.set(i, asyncLogWriter2);
    }

    void scheduleRescue(final int i, final AsyncLogWriter asyncLogWriter, int i2) {
        Runnable runnable = new Runnable() { // from class: org.apache.distributedlog.benchmark.DLWriterWorker.2
            @Override // java.lang.Runnable
            public void run() {
                DLWriterWorker.this.rescueWriter(i, asyncLogWriter);
            }
        };
        if (i2 > 0) {
            this.rescueService.schedule(runnable, i2, TimeUnit.MILLISECONDS);
        } else {
            this.rescueService.submit(runnable);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.running = false;
        SchedulerUtils.shutdownScheduler(this.executorService, 2L, TimeUnit.MINUTES);
        SchedulerUtils.shutdownScheduler(this.rescueService, 2L, TimeUnit.MINUTES);
        Iterator<AsyncLogWriter> it = this.streamWriters.iterator();
        while (it.hasNext()) {
            org.apache.distributedlog.util.Utils.ioResult(it.next().asyncClose());
        }
        Iterator<DistributedLogManager> it2 = this.dlms.iterator();
        while (it2.hasNext()) {
            it2.next().close();
        }
        this.namespace.close();
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info("Starting dlwriter (concurrency = {}, prefix = {}, numStreams = {})", new Object[]{Integer.valueOf(this.writeConcurrency), this.streamPrefix, Integer.valueOf(this.numStreams)});
        for (int i = 0; i < this.writeConcurrency; i++) {
            this.executorService.submit(new Writer(i));
        }
    }
}
