package org.apache.pulsar.metadata;

import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreTableView;
import org.apache.pulsar.metadata.cache.impl.JSONMetadataSerdeSimpleType;
import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
import org.apache.pulsar.metadata.tableview.impl.MetadataStoreTableViewImpl;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/metadata/MetadataStoreTableViewTest.class */
public class MetadataStoreTableViewTest extends BaseMetadataStoreTest {
    private static final Logger log = LoggerFactory.getLogger(MetadataStoreTableViewTest.class);
    LinkedBlockingDeque<Pair<String, Integer>> tails;
    LinkedBlockingDeque<Pair<String, Integer>> existings;

    @BeforeMethod
    void init() {
        this.tails = new LinkedBlockingDeque<>();
        this.existings = new LinkedBlockingDeque<>();
    }

    private void tailListener(String str, Integer num) {
        this.tails.add(Pair.of(str, num));
    }

    private void existingListener(String str, Integer num) {
        this.existings.add(Pair.of(str, num));
    }

    MetadataStoreTableViewImpl<Integer> createTestTableView(MetadataStore metadataStore, String str, Supplier<String> supplier) throws Exception {
        MetadataStoreTableViewImpl<Integer> build = MetadataStoreTableViewImpl.builder().name("test").clazz(Integer.class).store(metadataStore).pathPrefix(str).conflictResolver((num, num2) -> {
            return num == null || num2 == null || num.intValue() < num2.intValue();
        }).listenPathValidator(str2 -> {
            return str2.startsWith(str) && str2.contains("my");
        }).tailItemListeners(List.of(this::tailListener)).existingItemListeners(List.of(this::existingListener)).timeoutInMillis(5000L).build();
        build.start();
        return build;
    }

    private void assertGet(MetadataStoreTableViewImpl<Integer> metadataStoreTableViewImpl, String str, Integer num) {
        Assert.assertEquals((Integer) metadataStoreTableViewImpl.get(str), num);
    }

    @Test(dataProvider = "impl")
    public void emptyTableViewTest(String str, Supplier<String> supplier) throws Exception {
        String newKey = newKey();
        MetadataStore create = MetadataStoreFactoryImpl.create(supplier.get(), MetadataStoreConfig.builder().build());
        try {
            MetadataStoreTableViewImpl<Integer> createTestTableView = createTestTableView(create, newKey, supplier);
            Assert.assertFalse(createTestTableView.exists("non-existing-key"));
            Assert.assertFalse(createTestTableView.exists("non-existing-key/child"));
            Assert.assertNull(createTestTableView.get("non-existing-key"));
            Assert.assertNull(createTestTableView.get("non-existing-key/child"));
            try {
                createTestTableView.delete("non-existing-key").join();
                Assert.fail("Should have failed");
            } catch (CompletionException e) {
                assertException(e, (Class<?>) MetadataStoreException.NotFoundException.class);
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test(dataProvider = "impl")
    public void concurrentPutTest(String str, Supplier<String> supplier) throws Exception {
        String newKey = newKey();
        MetadataStore create = MetadataStoreFactoryImpl.create(supplier.get(), MetadataStoreConfig.builder().build());
        try {
            MetadataStoreTableViewImpl<Integer> createTestTableView = createTestTableView(create, newKey, supplier);
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < 50; i++) {
                arrayList.add(createTestTableView.put("my", 1).exceptionally(th -> {
                    if (th.getCause() instanceof MetadataStoreTableView.ConflictException) {
                        return null;
                    }
                    Assert.fail("fail to execute concurrent put", th);
                    return null;
                }));
            }
            FutureUtil.waitForAll(arrayList).join();
            assertGet(createTestTableView, "my", 1);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th2;
        }
    }

    @Test(dataProvider = "impl")
    public void conflictResolverTest(String str, Supplier<String> supplier) throws Exception {
        String newKey = newKey();
        MetadataStore create = MetadataStoreFactoryImpl.create(supplier.get(), MetadataStoreConfig.builder().build());
        try {
            MetadataStoreTableViewImpl<Integer> createTestTableView = createTestTableView(create, newKey, supplier);
            createTestTableView.put("my", 0).join();
            createTestTableView.put("my", 0).exceptionally(th -> {
                if (th.getCause() instanceof MetadataStoreTableView.ConflictException) {
                    return null;
                }
                Assert.fail("fail to execute concurrent put", th);
                return null;
            }).join();
            assertGet(createTestTableView, "my", 0);
            createTestTableView.put("my", 1).join();
            assertGet(createTestTableView, "my", 1);
            createTestTableView.put("my", 0).exceptionally(th2 -> {
                if (th2.getCause() instanceof MetadataStoreTableView.ConflictException) {
                    return null;
                }
                Assert.fail("fail to execute concurrent put", th2);
                return null;
            }).join();
            assertGet(createTestTableView, "my", 1);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th3) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th3;
        }
    }

    @Test(dataProvider = "impl")
    public void deleteTest(String str, Supplier<String> supplier) throws Exception {
        String newKey = newKey();
        MetadataStore create = MetadataStoreFactoryImpl.create(supplier.get(), MetadataStoreConfig.builder().build());
        try {
            MetadataStoreTableViewImpl<Integer> createTestTableView = createTestTableView(create, newKey, supplier);
            createTestTableView.put("key", 0).join();
            createTestTableView.delete("key").join();
            Assert.assertNull(createTestTableView.get("key"));
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test(dataProvider = "impl")
    public void mapApiTest(String str, Supplier<String> supplier) throws Exception {
        String newKey = newKey();
        MetadataStore create = MetadataStoreFactoryImpl.create(supplier.get(), MetadataStoreConfig.builder().build());
        try {
            MetadataStoreTableViewImpl<Integer> createTestTableView = createTestTableView(create, newKey, supplier);
            Assert.assertTrue(createTestTableView.isEmpty());
            Assert.assertEquals(createTestTableView.size(), 0);
            createTestTableView.put("my1", 1).join();
            createTestTableView.put("my2", 2).join();
            assertGet(createTestTableView, "my1", 1);
            assertGet(createTestTableView, "my2", 2);
            Assert.assertFalse(createTestTableView.isEmpty());
            Assert.assertEquals(createTestTableView.size(), 2);
            ArrayList arrayList = new ArrayList();
            createTestTableView.forEach((str2, num) -> {
                arrayList.add(str2 + "," + num);
            });
            Assert.assertEquals(arrayList, List.of("my1" + "," + 1, "my2" + "," + 2));
            Collection values = createTestTableView.values();
            Assert.assertEquals(values.size(), 2);
            Assert.assertTrue(values.containsAll(List.of(1, 2)));
            Set keySet = createTestTableView.keySet();
            Assert.assertEquals(keySet.size(), 2);
            Assert.assertTrue(keySet.containsAll(List.of("my1", "my2")));
            Set entrySet = createTestTableView.entrySet();
            Assert.assertEquals(entrySet.size(), 2);
            Assert.assertTrue(entrySet.containsAll(Map.of("my1", 1, "my2", 2).entrySet()));
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test(dataProvider = "impl")
    public void notificationListeners(String str, Supplier<String> supplier) throws Exception {
        String newKey = newKey();
        MetadataStore create = MetadataStoreFactoryImpl.create(supplier.get(), MetadataStoreConfig.builder().build());
        try {
            MetadataStoreTableViewImpl<Integer> createTestTableView = createTestTableView(create, newKey, supplier);
            String str2 = "tenant/ns" + "/my-1";
            assertGet(createTestTableView, str2, null);
            createTestTableView.put(str2, 1).join();
            Assert.assertEquals(this.tails.poll(3L, TimeUnit.SECONDS), Pair.of(str2, 1));
            Assert.assertEquals((Integer) createTestTableView.get(str2), 1);
            createTestTableView.put(str2, 2).join();
            Assert.assertEquals(this.tails.poll(3L, TimeUnit.SECONDS), Pair.of(str2, 2));
            Assert.assertEquals((Integer) createTestTableView.get(str2), 2);
            String str3 = str2 + "/my-child-1";
            createTestTableView.put(str3, 0).join();
            Assert.assertEquals(this.tails.poll(3L, TimeUnit.SECONDS), Pair.of(str3, 0));
            Assert.assertNull(this.tails.poll(3L, TimeUnit.SECONDS));
            Assert.assertEquals((Integer) createTestTableView.get(str2), 2);
            Assert.assertEquals((Integer) createTestTableView.get(str3), 0);
            createTestTableView.put(str3, 1).join();
            Assert.assertEquals(this.tails.poll(3L, TimeUnit.SECONDS), Pair.of(str3, 1));
            Assert.assertNull(this.tails.poll(3L, TimeUnit.SECONDS));
            Assert.assertEquals((Integer) createTestTableView.get(str2), 2);
            Assert.assertEquals((Integer) createTestTableView.get(str3), 1);
            createTestTableView.delete(str3).join();
            Assert.assertEquals(this.tails.poll(3L, TimeUnit.SECONDS), Pair.of(str3, (Object) null));
            Assert.assertNull(this.tails.poll(3L, TimeUnit.SECONDS));
            Assert.assertEquals((Integer) createTestTableView.get(str2), 2);
            Assert.assertNull(createTestTableView.get(str3));
            String str4 = "tenant/ns" + "/to-be-filtered";
            createTestTableView.put(str4, 0).join();
            Assert.assertNull(this.tails.poll(3L, TimeUnit.SECONDS));
            Assert.assertEquals((Integer) createTestTableView.get(str2), 2);
            Assert.assertNull(createTestTableView.get(str4));
            createTestTableView.delete(str2).join();
            Assert.assertEquals(this.tails.poll(3L, TimeUnit.SECONDS), Pair.of(str2, (Object) null));
            Assert.assertNull(createTestTableView.get(str2));
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test(dataProvider = "impl")
    public void testConcurrentPutGetOneKey(String str, Supplier<String> supplier) throws Exception {
        String newKey = newKey();
        MetadataStore create = MetadataStoreFactoryImpl.create(supplier.get(), MetadataStoreConfig.builder().build());
        try {
            final MetadataStoreTableViewImpl<Integer> createTestTableView = createTestTableView(create, newKey, supplier);
            final String str2 = "my";
            final int i = 50;
            createTestTableView.put("my", 0).join();
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            Runnable runnable = new Runnable() { // from class: org.apache.pulsar.metadata.MetadataStoreTableViewTest.1
                @Override // java.lang.Runnable
                public void run() {
                    for (int i2 = 0; i2 < 1000; i2++) {
                        Pair<String, Integer> poll = MetadataStoreTableViewTest.this.tails.poll(3L, TimeUnit.SECONDS);
                        if (poll == null) {
                            break;
                        }
                        Integer valueOf = Integer.valueOf(((Integer) poll.getRight()).intValue() + 1);
                        if (valueOf.intValue() > i) {
                            break;
                        }
                        CompletableFuture put = createTestTableView.put(str2, valueOf);
                        AtomicInteger atomicInteger2 = atomicInteger;
                        Objects.requireNonNull(atomicInteger2);
                        CompletableFuture<Void> thenRun = put.thenRun(atomicInteger2::incrementAndGet);
                        try {
                            thenRun.get();
                        } catch (Exception e) {
                        }
                        MetadataStoreTableViewTest.log.info("Put value {} success:{}. ", valueOf, Boolean.valueOf(!thenRun.isCompletedExceptionally()));
                    }
                }
            };
            CompletableFuture<Void> thenRunAsync = CompletableFuture.completedFuture(null).thenRunAsync(runnable);
            CompletableFuture<Void> thenRunAsync2 = CompletableFuture.completedFuture(null).thenRunAsync(runnable);
            runnable.run();
            thenRunAsync.join();
            thenRunAsync2.join();
            Assert.assertFalse(thenRunAsync.isCompletedExceptionally());
            Assert.assertFalse(thenRunAsync2.isCompletedExceptionally());
            Assert.assertEquals(atomicInteger.get(), 50);
            Assert.assertEquals((Integer) createTestTableView.get("my"), 50);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test(dataProvider = "impl")
    public void testConcurrentPut(String str, Supplier<String> supplier) throws Exception {
        String newKey = newKey();
        MetadataStore create = MetadataStoreFactoryImpl.create(supplier.get(), MetadataStoreConfig.builder().build());
        try {
            MetadataStoreTableViewImpl<Integer> createTestTableView = createTestTableView(create, newKey, supplier);
            String str2 = "my";
            int i = 0;
            CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
                createTestTableView.put(str2, Integer.valueOf(i)).join();
            });
            CompletableFuture<Void> runAsync2 = CompletableFuture.runAsync(() -> {
                createTestTableView.put(str2, Integer.valueOf(i)).join();
            });
            Awaitility.await().until(() -> {
                return Boolean.valueOf(runAsync.isDone() && runAsync2.isDone());
            });
            Assert.assertTrue((runAsync.isCompletedExceptionally() && !runAsync2.isCompletedExceptionally()) || (!runAsync.isCompletedExceptionally() && runAsync2.isCompletedExceptionally()));
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test(dataProvider = "impl")
    public void testConcurrentDelete(String str, Supplier<String> supplier) throws Exception {
        String newKey = newKey();
        MetadataStore create = MetadataStoreFactoryImpl.create(supplier.get(), MetadataStoreConfig.builder().build());
        try {
            MetadataStoreTableViewImpl<Integer> createTestTableView = createTestTableView(create, newKey, supplier);
            String str2 = "my";
            createTestTableView.put("my", 0).join();
            CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
                createTestTableView.delete(str2).join();
            });
            CompletableFuture<Void> runAsync2 = CompletableFuture.runAsync(() -> {
                createTestTableView.delete(str2).join();
            });
            Awaitility.await().until(() -> {
                return Boolean.valueOf(runAsync.isDone() && runAsync2.isDone());
            });
            Assert.assertTrue((runAsync.isCompletedExceptionally() && !runAsync2.isCompletedExceptionally()) || (!runAsync.isCompletedExceptionally() && runAsync2.isCompletedExceptionally()));
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test(dataProvider = "impl")
    public void testClosedMetadataStore(String str, Supplier<String> supplier) throws Exception {
        String newKey = newKey();
        MetadataStore create = MetadataStoreFactoryImpl.create(supplier.get(), MetadataStoreConfig.builder().build());
        try {
            MetadataStoreTableViewImpl<Integer> createTestTableView = createTestTableView(create, newKey, supplier);
            create.close();
            try {
                createTestTableView.put("my", 0).get();
                Assert.fail();
            } catch (Exception e) {
                Assert.assertTrue(e.getCause() instanceof MetadataStoreException.AlreadyClosedException);
            }
            try {
                createTestTableView.delete("my").get();
                Assert.fail();
            } catch (Exception e2) {
                Assert.assertTrue(e2.getCause() instanceof MetadataStoreException.AlreadyClosedException);
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test(dataProvider = "distributedImpl")
    public void testGetIfCachedDistributed(String str, Supplier<String> supplier) throws Exception {
        String newKey = newKey();
        String str2 = "my";
        MetadataStore create = MetadataStoreFactoryImpl.create(supplier.get(), MetadataStoreConfig.builder().build());
        try {
            MetadataStoreTableViewImpl<Integer> createTestTableView = createTestTableView(create, newKey, supplier);
            create = MetadataStoreFactoryImpl.create(supplier.get(), MetadataStoreConfig.builder().build());
            try {
                MetadataStoreTableViewImpl<Integer> createTestTableView2 = createTestTableView(create, newKey, supplier);
                Assert.assertNull(createTestTableView.get("my"));
                Assert.assertNull(createTestTableView2.get("my"));
                createTestTableView.put("my", 0).join();
                assertGet(createTestTableView, "my", 0);
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals((Integer) createTestTableView2.get(str2), 0);
                });
                createTestTableView2.put("my", 1).join();
                assertGet(createTestTableView2, "my", 1);
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals((Integer) createTestTableView.get(str2), 1);
                });
                createTestTableView.delete("my").join();
                assertGet(createTestTableView, "my", null);
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertNull(createTestTableView2.get(str2));
                });
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } finally {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test(dataProvider = "distributedImpl")
    public void testInitialFill(String str, Supplier<String> supplier) throws Exception {
        String newKey = newKey();
        MetadataStore create = MetadataStoreFactoryImpl.create(supplier.get(), MetadataStoreConfig.builder().build());
        try {
            Assert.assertFalse(createTestTableView(create, newKey, supplier).exists("tenant-1/ns-1/my-1"));
            JSONMetadataSerdeSimpleType jSONMetadataSerdeSimpleType = new JSONMetadataSerdeSimpleType(TypeFactory.defaultInstance().constructSimpleType(Integer.class, (JavaType[]) null));
            create.put(newKey + "/" + "tenant-1/ns-1/my-1", jSONMetadataSerdeSimpleType.serialize(newKey + "/" + "tenant-1/ns-1/my-1", 0), Optional.empty()).join();
            create.put(newKey + "/" + "tenant-1/ns-1/my-2", jSONMetadataSerdeSimpleType.serialize(newKey + "/" + "tenant-1/ns-1/my-2", 1), Optional.empty()).join();
            create.put(newKey + "/" + "tenant-1/ns-2/my-3", jSONMetadataSerdeSimpleType.serialize(newKey + "/" + "tenant-1/ns-2/my-3", 2), Optional.empty()).join();
            create.put(newKey + "/" + "tenant-2/ns-3/my-4", jSONMetadataSerdeSimpleType.serialize(newKey + "/" + "tenant-2/ns-3/my-4", 3), Optional.empty()).join();
            create.put(newKey + "/" + "tenant-2/ns-3/your-1", jSONMetadataSerdeSimpleType.serialize(newKey + "/" + "tenant-2/ns-3/your-1", 4), Optional.empty()).join();
            HashSet hashSet = new HashSet(Set.of(Pair.of("tenant-1/ns-1/my-1", 0), Pair.of("tenant-1/ns-1/my-2", 1), Pair.of("tenant-1/ns-2/my-3", 2), Pair.of("tenant-2/ns-3/my-4", 3)));
            HashSet hashSet2 = new HashSet(hashSet);
            for (int i = 0; i < 4; i++) {
                Assert.assertTrue(hashSet2.remove(this.tails.poll(3L, TimeUnit.SECONDS)));
            }
            Assert.assertNull(this.tails.poll(3L, TimeUnit.SECONDS));
            Assert.assertTrue(hashSet2.isEmpty());
            create = MetadataStoreFactoryImpl.create(supplier.get(), MetadataStoreConfig.builder().build());
            try {
                MetadataStoreTableViewImpl<Integer> createTestTableView = createTestTableView(create, newKey, supplier);
                HashSet hashSet3 = new HashSet(Set.of(Pair.of("tenant-1/ns-1/my-1", 0), Pair.of("tenant-1/ns-1/my-2", 1), Pair.of("tenant-1/ns-2/my-3", 2), Pair.of("tenant-2/ns-3/my-4", 3)));
                Set entrySet = ((Map) hashSet.stream().collect(Collectors.toMap((v0) -> {
                    return v0.getLeft();
                }, (v0) -> {
                    return v0.getRight();
                }))).entrySet();
                for (int i2 = 0; i2 < 4; i2++) {
                    Assert.assertTrue(hashSet3.remove(this.existings.poll(3L, TimeUnit.SECONDS)));
                }
                Assert.assertNull(this.existings.poll(3L, TimeUnit.SECONDS));
                Assert.assertTrue(hashSet3.isEmpty());
                Assert.assertEquals((Integer) createTestTableView.get("tenant-1/ns-1/my-1"), 0);
                Assert.assertEquals((Integer) createTestTableView.get("tenant-1/ns-1/my-2"), 1);
                Assert.assertEquals((Integer) createTestTableView.get("tenant-1/ns-2/my-3"), 2);
                Assert.assertEquals((Integer) createTestTableView.get("tenant-2/ns-3/my-4"), 3);
                Assert.assertNull(createTestTableView.get("tenant-2/ns-3/your-1"));
                Assert.assertEquals(createTestTableView.entrySet(), entrySet);
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } finally {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }
}
