package org.apache.distributedlog.basic;

import com.google.common.base.Charsets;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.exceptions.LogEmptyException;
import org.apache.distributedlog.exceptions.LogNotFoundException;

/* loaded from: input_file:org/apache/distributedlog/basic/TailReader.class */
public class TailReader {
    private static final String HELP = "TailReader <uri> <string>";

    public static void main(String[] strArr) throws Exception {
        if (2 != strArr.length) {
            System.out.println(HELP);
            return;
        }
        String str = strArr[0];
        String str2 = strArr[1];
        Namespace build = NamespaceBuilder.newBuilder().conf(new DistributedLogConfiguration()).uri(URI.create(str)).build();
        System.out.println("Opening log stream " + str2);
        DistributedLogManager openLog = build.openLog(str2);
        try {
            try {
                readLoop(openLog, openLog.getLastLogRecord().getDlsn());
                openLog.close();
                build.close();
            } catch (LogEmptyException e) {
                System.err.println("Log stream " + str2 + " is empty.");
                readLoop(openLog, DLSN.InitialDLSN);
                openLog.close();
                build.close();
            } catch (LogNotFoundException e2) {
                System.err.println("Log stream " + str2 + " is not found. Please create it first.");
                openLog.close();
                build.close();
            }
        } catch (Throwable th) {
            openLog.close();
            build.close();
            throw th;
        }
    }

    private static void readLoop(final DistributedLogManager distributedLogManager, DLSN dlsn) throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        System.out.println("Wait for records starting from " + dlsn);
        final AsyncLogReader asyncLogReader = (AsyncLogReader) FutureUtils.result(distributedLogManager.openAsyncLogReader(dlsn));
        asyncLogReader.readNext().whenComplete((BiConsumer) new FutureEventListener<LogRecordWithDLSN>() { // from class: org.apache.distributedlog.basic.TailReader.1
            public void onFailure(Throwable th) {
                System.err.println("Encountered error on reading records from stream " + distributedLogManager.getStreamName());
                th.printStackTrace(System.err);
                countDownLatch.countDown();
            }

            public void onSuccess(LogRecordWithDLSN logRecordWithDLSN) {
                System.out.println("Received record " + logRecordWithDLSN.getDlsn());
                System.out.println("\"\"\"");
                System.out.println(new String(logRecordWithDLSN.getPayload(), Charsets.UTF_8));
                System.out.println("\"\"\"");
                asyncLogReader.readNext().whenComplete((BiConsumer) this);
            }
        });
        countDownLatch.await();
        FutureUtils.result(asyncLogReader.asyncClose(), 5L, TimeUnit.SECONDS);
    }
}
