/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata;

import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.metadata.BaseMetadataStoreTest;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.coordination.LeaderElection;
import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.Test;

public class ZKSessionTest
extends BaseMetadataStoreTest {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDisconnection() throws Exception {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)this.zks.getConnectionString(), (MetadataStoreConfig)MetadataStoreConfig.builder().sessionTimeoutMillis(30000).build());
        try {
            LinkedBlockingQueue sessionEvents = new LinkedBlockingQueue();
            store.registerSessionListener(sessionEvents::add);
            this.zks.stop();
            SessionEvent e = (SessionEvent)sessionEvents.poll(5L, TimeUnit.SECONDS);
            Assert.assertEquals((Object)e, (Object)SessionEvent.ConnectionLost);
            this.zks.start();
            e = (SessionEvent)sessionEvents.poll(20L, TimeUnit.SECONDS);
            Assert.assertEquals((Object)e, (Object)SessionEvent.Reconnected);
            e = (SessionEvent)sessionEvents.poll(5L, TimeUnit.SECONDS);
            Assert.assertNull((Object)e);
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSessionLost() throws Exception {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)this.zks.getConnectionString(), (MetadataStoreConfig)MetadataStoreConfig.builder().sessionTimeoutMillis(10000).build());
        try {
            LinkedBlockingQueue sessionEvents = new LinkedBlockingQueue();
            store.registerSessionListener(sessionEvents::add);
            this.zks.stop();
            SessionEvent e = (SessionEvent)sessionEvents.poll(5L, TimeUnit.SECONDS);
            Assert.assertEquals((Object)e, (Object)SessionEvent.ConnectionLost);
            e = (SessionEvent)sessionEvents.poll(10L, TimeUnit.SECONDS);
            Assert.assertEquals((Object)e, (Object)SessionEvent.SessionLost);
            this.zks.start();
            e = (SessionEvent)sessionEvents.poll(10L, TimeUnit.SECONDS);
            Assert.assertEquals((Object)e, (Object)SessionEvent.Reconnected);
            e = (SessionEvent)sessionEvents.poll(10L, TimeUnit.SECONDS);
            Assert.assertEquals((Object)e, (Object)SessionEvent.SessionReestablished);
            e = (SessionEvent)sessionEvents.poll(1L, TimeUnit.SECONDS);
            Assert.assertNull((Object)e);
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReacquireLocksAfterSessionLost() throws Exception {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)this.zks.getConnectionString(), (MetadataStoreConfig)MetadataStoreConfig.builder().sessionTimeoutMillis(2000).build());
        try {
            LinkedBlockingQueue sessionEvents = new LinkedBlockingQueue();
            store.registerSessionListener(sessionEvents::add);
            CoordinationServiceImpl coordinationService = new CoordinationServiceImpl(store);
            try {
                LockManager lm1 = coordinationService.getLockManager(String.class);
                try {
                    String path = this.newKey();
                    ResourceLock lock = (ResourceLock)lm1.acquireLock(path, (Object)"value-1").join();
                    this.zks.expireSession(((ZKMetadataStore)store).getZkSessionId());
                    SessionEvent e = (SessionEvent)sessionEvents.poll(5L, TimeUnit.SECONDS);
                    Assert.assertEquals((Object)e, (Object)SessionEvent.ConnectionLost);
                    e = (SessionEvent)sessionEvents.poll(10L, TimeUnit.SECONDS);
                    Assert.assertEquals((Object)e, (Object)SessionEvent.SessionLost);
                    e = (SessionEvent)sessionEvents.poll(10L, TimeUnit.SECONDS);
                    Assert.assertEquals((Object)e, (Object)SessionEvent.Reconnected);
                    e = (SessionEvent)sessionEvents.poll(10L, TimeUnit.SECONDS);
                    Assert.assertEquals((Object)e, (Object)SessionEvent.SessionReestablished);
                    Awaitility.await().untilAsserted(() -> Assert.assertTrue((boolean)((Optional)store.get(path).join()).isPresent()));
                    Assert.assertFalse((boolean)lock.getLockExpiredFuture().isDone());
                }
                finally {
                    if (Collections.singletonList(lm1).get(0) != null) {
                        lm1.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(coordinationService).get(0) != null) {
                    coordinationService.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReacquireLeadershipAfterSessionLost() throws Exception {
        MetadataStoreExtended store = MetadataStoreExtended.create((String)this.zks.getConnectionString(), (MetadataStoreConfig)MetadataStoreConfig.builder().sessionTimeoutMillis(2000).build());
        try {
            LinkedBlockingQueue sessionEvents = new LinkedBlockingQueue();
            store.registerSessionListener(sessionEvents::add);
            LinkedBlockingQueue leaderElectionEvents = new LinkedBlockingQueue();
            String path = this.newKey();
            CoordinationServiceImpl coordinationService = new CoordinationServiceImpl(store);
            try {
                LeaderElection le1 = coordinationService.getLeaderElection(String.class, path, leaderElectionEvents::add);
                try {
                    le1.elect((Object)"value-1").join();
                    Assert.assertEquals((Object)le1.getState(), (Object)LeaderElectionState.Leading);
                    LeaderElectionState les = (LeaderElectionState)leaderElectionEvents.poll(5L, TimeUnit.SECONDS);
                    Assert.assertEquals((Object)les, (Object)LeaderElectionState.Leading);
                    this.zks.expireSession(((ZKMetadataStore)store).getZkSessionId());
                    SessionEvent e = (SessionEvent)sessionEvents.poll(5L, TimeUnit.SECONDS);
                    Assert.assertEquals((Object)e, (Object)SessionEvent.ConnectionLost);
                    e = (SessionEvent)sessionEvents.poll(10L, TimeUnit.SECONDS);
                    Assert.assertEquals((Object)e, (Object)SessionEvent.SessionLost);
                    Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)le1.getState(), (Object)LeaderElectionState.Leading));
                    e = (SessionEvent)sessionEvents.poll(10L, TimeUnit.SECONDS);
                    Assert.assertEquals((Object)e, (Object)SessionEvent.Reconnected);
                    e = (SessionEvent)sessionEvents.poll(10L, TimeUnit.SECONDS);
                    Assert.assertEquals((Object)e, (Object)SessionEvent.SessionReestablished);
                    Awaitility.await().untilAsserted(() -> Assert.assertEquals((Object)le1.getState(), (Object)LeaderElectionState.Leading));
                    Assert.assertTrue((boolean)((Optional)store.get(path).join()).isPresent());
                }
                finally {
                    if (Collections.singletonList(le1).get(0) != null) {
                        le1.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(coordinationService).get(0) != null) {
                    coordinationService.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(store).get(0) != null) {
                store.close();
            }
        }
    }
}

