package com.twitter.distributedlog.basic;

import com.google.common.base.Charsets;
import com.twitter.distributedlog.AsyncLogReader;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.DistributedLogManager;
import com.twitter.distributedlog.LogRecordWithDLSN;
import com.twitter.distributedlog.namespace.DistributedLogNamespace;
import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.util.CountDownLatch;
import com.twitter.util.Duration;
import com.twitter.util.FutureEventListener;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:com/twitter/distributedlog/basic/StreamRewinder.class */
public class StreamRewinder {
    private static final String HELP = "StreamRewinder <uri> <string> <seconds>";

    public static void main(String[] strArr) throws Exception {
        if (3 != strArr.length) {
            System.out.println(HELP);
            return;
        }
        String str = strArr[0];
        String str2 = strArr[1];
        int parseInt = Integer.parseInt(strArr[2]);
        DistributedLogNamespace build = DistributedLogNamespaceBuilder.newBuilder().conf(new DistributedLogConfiguration()).uri(URI.create(str)).build();
        System.out.println("Opening log stream " + str2);
        DistributedLogManager openLog = build.openLog(str2);
        try {
            readLoop(openLog, parseInt);
            openLog.close();
            build.close();
        } catch (Throwable th) {
            openLog.close();
            build.close();
            throw th;
        }
    }

    private static void readLoop(final DistributedLogManager distributedLogManager, int i) throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        long currentTimeMillis = System.currentTimeMillis() - TimeUnit.MILLISECONDS.convert(i, TimeUnit.SECONDS);
        System.out.println("Record records starting from " + currentTimeMillis + " which is " + i + " seconds ago");
        final AsyncLogReader asyncLogReader = (AsyncLogReader) FutureUtils.result(distributedLogManager.openAsyncLogReader(currentTimeMillis));
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        asyncLogReader.readNext().addEventListener(new FutureEventListener<LogRecordWithDLSN>() { // from class: com.twitter.distributedlog.basic.StreamRewinder.1
            public void onFailure(Throwable th) {
                System.err.println("Encountered error on reading records from stream " + distributedLogManager.getStreamName());
                th.printStackTrace(System.err);
                countDownLatch.countDown();
            }

            public void onSuccess(LogRecordWithDLSN logRecordWithDLSN) {
                System.out.println("Received record " + logRecordWithDLSN.getDlsn());
                System.out.println("\"\"\"");
                System.out.println(new String(logRecordWithDLSN.getPayload(), Charsets.UTF_8));
                System.out.println("\"\"\"");
                long currentTimeMillis2 = System.currentTimeMillis() - logRecordWithDLSN.getTransactionId();
                if (!atomicBoolean.get() && currentTimeMillis2 < 2000) {
                    System.out.println("Reader caught with latest data");
                    atomicBoolean.set(true);
                }
                asyncLogReader.readNext().addEventListener(this);
            }
        });
        countDownLatch.await();
        FutureUtils.result(asyncLogReader.asyncClose(), Duration.apply(5L, TimeUnit.SECONDS));
    }
}
