package com.twitter.distributedlog.admin;

import com.twitter.distributedlog.AsyncLogReader;
import com.twitter.distributedlog.DLMTestUtil;
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.DistributedLogManager;
import com.twitter.distributedlog.DistributedLogManagerFactory;
import com.twitter.distributedlog.LogRecord;
import com.twitter.distributedlog.TestDistributedLogBase;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.ZooKeeperClientBuilder;
import com.twitter.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
import com.twitter.distributedlog.util.Utils;
import com.twitter.util.Await;
import com.twitter.util.Duration;
import com.twitter.util.Future;
import com.twitter.util.TimeoutException;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/twitter/distributedlog/admin/TestDistributedLogAdmin.class */
public class TestDistributedLogAdmin extends TestDistributedLogBase {
    static final Logger LOG = LoggerFactory.getLogger(TestDistributedLogAdmin.class);
    private ZooKeeperClient zooKeeperClient;

    @Override // com.twitter.distributedlog.TestDistributedLogBase
    @Before
    public void setup() throws Exception {
        this.zooKeeperClient = ZooKeeperClientBuilder.newBuilder().uri(createDLMURI("/")).sessionTimeoutMs(10000).zkAclId((String) null).build();
        conf.setTraceReadAheadMetadataChanges(true);
    }

    @Override // com.twitter.distributedlog.TestDistributedLogBase
    @After
    public void teardown() throws Exception {
        this.zooKeeperClient.close();
    }

    @Test(timeout = 60000)
    public void testChangeSequenceNumber() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setLogSegmentSequenceNumberValidationEnabled(false);
        URI createDLMURI = createDLMURI("/change-sequence-number");
        this.zooKeeperClient.get().create(createDLMURI.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        DistributedLogManagerFactory distributedLogManagerFactory = new DistributedLogManagerFactory(distributedLogConfiguration, createDLMURI);
        DistributedLogManager createDistributedLogManagerWithSharedClients = distributedLogManagerFactory.createDistributedLogManagerWithSharedClients("change-sequence-number");
        DLMTestUtil.generateCompletedLogSegments(createDistributedLogManagerWithSharedClients, distributedLogConfiguration, 4L, 10L);
        DLMTestUtil.injectLogSegmentWithGivenLogSegmentSeqNo(createDistributedLogManagerWithSharedClients, distributedLogConfiguration, 5L, 41L, false, 10L, true);
        createDistributedLogManagerWithSharedClients.close();
        DistributedLogManager createDistributedLogManagerWithSharedClients2 = distributedLogManagerFactory.createDistributedLogManagerWithSharedClients("change-sequence-number");
        AsyncLogReader asyncLogReader = createDistributedLogManagerWithSharedClients2.getAsyncLogReader(DLSN.InitialDLSN);
        long j = 1;
        for (int i = 0; i < 40; i++) {
            LogRecord logRecord = (LogRecord) Await.result(asyncLogReader.readNext());
            Assert.assertNotNull(logRecord);
            DLMTestUtil.verifyLogRecord(logRecord);
            Assert.assertEquals(j, logRecord.getTransactionId());
            j++;
        }
        DistributedLogManager createDistributedLogManagerWithSharedClients3 = distributedLogManagerFactory.createDistributedLogManagerWithSharedClients("change-sequence-number");
        DLMTestUtil.injectLogSegmentWithGivenLogSegmentSeqNo(createDistributedLogManagerWithSharedClients3, distributedLogConfiguration, 3L, 51L, true, 10L, false);
        TimeUnit.SECONDS.sleep(2L);
        DLSN lastDLSN = createDistributedLogManagerWithSharedClients2.getLastDLSN();
        Assert.assertTrue(lastDLSN.compareTo(new DLSN(5L, Long.MIN_VALUE, Long.MIN_VALUE)) < 0);
        Assert.assertTrue(lastDLSN.compareTo(new DLSN(4L, -1L, Long.MIN_VALUE)) > 0);
        Future readNext = asyncLogReader.readNext();
        try {
            Await.result(readNext, Duration.fromMilliseconds(1000L));
            Assert.fail("Should fail reading next when there is a corrupted log segment");
        } catch (TimeoutException e) {
        }
        DistributedLogAdmin.fixInprogressSegmentWithLowerSequenceNumber(distributedLogManagerFactory, new DryrunLogSegmentMetadataStoreUpdater(distributedLogConfiguration, getLogSegmentMetadataStore(distributedLogManagerFactory)), "change-sequence-number", false, false);
        TimeUnit.SECONDS.sleep(2L);
        DLSN lastDLSN2 = createDistributedLogManagerWithSharedClients2.getLastDLSN();
        Assert.assertTrue(lastDLSN2.compareTo(new DLSN(5L, Long.MIN_VALUE, Long.MIN_VALUE)) < 0);
        Assert.assertTrue(lastDLSN2.compareTo(new DLSN(4L, -1L, Long.MIN_VALUE)) > 0);
        try {
            Await.result(readNext, Duration.fromMilliseconds(1000L));
            Assert.fail("Should fail reading next when there is a corrupted log segment");
        } catch (TimeoutException e2) {
        }
        DistributedLogAdmin.fixInprogressSegmentWithLowerSequenceNumber(distributedLogManagerFactory, LogSegmentMetadataStoreUpdater.createMetadataUpdater(distributedLogConfiguration, getLogSegmentMetadataStore(distributedLogManagerFactory)), "change-sequence-number", false, false);
        TimeUnit.SECONDS.sleep(2L);
        LogRecord logRecord2 = (LogRecord) Await.result(readNext);
        Assert.assertNotNull(logRecord2);
        DLMTestUtil.verifyLogRecord(logRecord2);
        Assert.assertEquals(51L, logRecord2.getTransactionId());
        long j2 = 51 + 1;
        for (int i2 = 1; i2 < 10; i2++) {
            LogRecord logRecord3 = (LogRecord) Await.result(asyncLogReader.readNext());
            Assert.assertNotNull(logRecord3);
            DLMTestUtil.verifyLogRecord(logRecord3);
            Assert.assertEquals(j2, logRecord3.getTransactionId());
            j2++;
        }
        DLSN lastDLSN3 = createDistributedLogManagerWithSharedClients2.getLastDLSN();
        LOG.info("LastDLSN after fix inprogress segment : {}", lastDLSN3);
        Assert.assertTrue(lastDLSN3.compareTo(new DLSN(7L, Long.MIN_VALUE, Long.MIN_VALUE)) < 0);
        Assert.assertTrue(lastDLSN3.compareTo(new DLSN(6L, -1L, Long.MIN_VALUE)) > 0);
        Utils.close(asyncLogReader);
        createDistributedLogManagerWithSharedClients2.close();
        createDistributedLogManagerWithSharedClients3.close();
        distributedLogManagerFactory.close();
    }
}
