package com.twitter.distributedlog.basic;

import com.google.common.base.Charsets;
import com.twitter.distributedlog.AsyncLogReader;
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.DistributedLogManager;
import com.twitter.distributedlog.LogRecordWithDLSN;
import com.twitter.distributedlog.exceptions.LogEmptyException;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.namespace.DistributedLogNamespace;
import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
import com.twitter.util.FutureEventListener;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.lang.StringUtils;

/* loaded from: input_file:com/twitter/distributedlog/basic/MultiReader.class */
public class MultiReader {
    private static final String HELP = "TailReader <uri> <stream-1>[,<stream-2>,...,<stream-n>]";

    public static void main(String[] strArr) throws Exception {
        if (2 != strArr.length) {
            System.out.println(HELP);
            return;
        }
        String str = strArr[0];
        String str2 = strArr[1];
        DistributedLogNamespace build = DistributedLogNamespaceBuilder.newBuilder().conf(new DistributedLogConfiguration()).uri(URI.create(str)).build();
        String[] split = StringUtils.split(str2, ',');
        DistributedLogManager[] distributedLogManagerArr = new DistributedLogManager[split.length];
        for (int i = 0; i < distributedLogManagerArr.length; i++) {
            String str3 = split[i];
            System.out.println("Opening log stream " + str3);
            distributedLogManagerArr[i] = build.openLog(str3);
        }
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        for (final DistributedLogManager distributedLogManager : distributedLogManagerArr) {
            distributedLogManager.getLastLogRecordAsync().addEventListener(new FutureEventListener<LogRecordWithDLSN>() { // from class: com.twitter.distributedlog.basic.MultiReader.1
                public void onFailure(Throwable th) {
                    if (th instanceof LogNotFoundException) {
                        System.err.println("Log stream " + distributedLogManager.getStreamName() + " is not found. Please create it first.");
                        countDownLatch.countDown();
                    } else if (th instanceof LogEmptyException) {
                        System.err.println("Log stream " + distributedLogManager.getStreamName() + " is empty.");
                        MultiReader.readLoop(distributedLogManager, DLSN.InitialDLSN, countDownLatch);
                    } else {
                        System.err.println("Encountered exception on process stream " + distributedLogManager.getStreamName());
                        countDownLatch.countDown();
                    }
                }

                public void onSuccess(LogRecordWithDLSN logRecordWithDLSN) {
                    MultiReader.readLoop(distributedLogManager, logRecordWithDLSN.getDlsn(), countDownLatch);
                }
            });
        }
        countDownLatch.await();
        for (DistributedLogManager distributedLogManager2 : distributedLogManagerArr) {
            distributedLogManager2.close();
        }
        build.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void readLoop(final DistributedLogManager distributedLogManager, DLSN dlsn, final CountDownLatch countDownLatch) {
        System.out.println("Wait for records from " + distributedLogManager.getStreamName() + " starting from " + dlsn);
        distributedLogManager.openAsyncLogReader(dlsn).addEventListener(new FutureEventListener<AsyncLogReader>() { // from class: com.twitter.distributedlog.basic.MultiReader.2
            public void onFailure(Throwable th) {
                System.err.println("Encountered error on reading records from stream " + distributedLogManager.getStreamName());
                th.printStackTrace(System.err);
                countDownLatch.countDown();
            }

            public void onSuccess(AsyncLogReader asyncLogReader) {
                System.out.println("Open reader to read records from stream " + asyncLogReader.getStreamName());
                MultiReader.readLoop(asyncLogReader, countDownLatch);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void readLoop(final AsyncLogReader asyncLogReader, final CountDownLatch countDownLatch) {
        asyncLogReader.readNext().addEventListener(new FutureEventListener<LogRecordWithDLSN>() { // from class: com.twitter.distributedlog.basic.MultiReader.3
            public void onFailure(Throwable th) {
                System.err.println("Encountered error on reading records from stream " + asyncLogReader.getStreamName());
                th.printStackTrace(System.err);
                countDownLatch.countDown();
            }

            public void onSuccess(LogRecordWithDLSN logRecordWithDLSN) {
                System.out.println("Received record " + logRecordWithDLSN.getDlsn() + " from stream " + asyncLogReader.getStreamName());
                System.out.println("\"\"\"");
                System.out.println(new String(logRecordWithDLSN.getPayload(), Charsets.UTF_8));
                System.out.println("\"\"\"");
                asyncLogReader.readNext().addEventListener(this);
            }
        });
    }
}
