package org.apache.pulsar.broker.service.plugin;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.nar.NarClassLoaderBuilder;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/plugin/EntryFilterProvider.class */
public class EntryFilterProvider implements AutoCloseable {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(EntryFilterProvider.class);

    @VisibleForTesting
    static final String ENTRY_FILTER_DEFINITION_FILE = "entry_filter";
    private final ServiceConfiguration serviceConfiguration;

    @VisibleForTesting
    protected Map<String, EntryFilterMetaData> definitions;

    @VisibleForTesting
    protected Map<String, NarClassLoader> cachedClassLoaders;

    @VisibleForTesting
    protected List<EntryFilter> brokerEntryFilters;

    public EntryFilterProvider(ServiceConfiguration serviceConfiguration) throws IOException {
        this.serviceConfiguration = serviceConfiguration;
        initialize();
        initializeBrokerEntryFilters();
    }

    protected void initializeBrokerEntryFilters() throws IOException {
        if (this.serviceConfiguration.getEntryFilterNames().isEmpty()) {
            this.brokerEntryFilters = Collections.emptyList();
        } else {
            this.brokerEntryFilters = loadEntryFilters(this.serviceConfiguration.getEntryFilterNames());
        }
    }

    public void validateEntryFilters(String str) throws InvalidEntryFilterException {
        if (StringUtils.isBlank(str)) {
            return;
        }
        for (String str2 : readEntryFiltersString(str)) {
            if (this.definitions.get(str2) == null) {
                throw new InvalidEntryFilterException("Entry filter '" + str2 + "' not found");
            }
        }
    }

    private List<String> readEntryFiltersString(String str) {
        return Arrays.stream(str.split(",")).filter(str2 -> {
            return StringUtils.isNotBlank(str2);
        }).toList();
    }

    public List<EntryFilter> loadEntryFiltersForPolicy(EntryFilters entryFilters) throws IOException {
        String entryFilterNames = entryFilters.getEntryFilterNames();
        return StringUtils.isBlank(entryFilterNames) ? Collections.emptyList() : loadEntryFilters(readEntryFiltersString(entryFilterNames));
    }

    private List<EntryFilter> loadEntryFilters(Collection<String> collection) throws IOException {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (String str : collection) {
            EntryFilterMetaData entryFilterMetaData = this.definitions.get(str);
            if (null == entryFilterMetaData) {
                throw new RuntimeException("No entry filter is found for name `" + str + "`. Available entry filters are : " + this.definitions.keySet());
            }
            builder.put(str, load(entryFilterMetaData));
            log.info("Successfully loaded entry filter `{}`", str);
        }
        return builder.build().values().asList();
    }

    public List<EntryFilter> getBrokerEntryFilters() {
        return this.brokerEntryFilters;
    }

    private void initialize() throws IOException {
        Path normalize = Paths.get(this.serviceConfiguration.getEntryFiltersDirectory(), new String[0]).toAbsolutePath().normalize();
        log.info("Searching for entry filters in {}", normalize);
        if (!normalize.toFile().exists()) {
            log.info("Pulsar entry filters directory not found");
            this.definitions = Collections.emptyMap();
            this.cachedClassLoaders = Collections.emptyMap();
            return;
        }
        HashMap hashMap = new HashMap();
        this.cachedClassLoaders = new HashMap();
        DirectoryStream<Path> newDirectoryStream = Files.newDirectoryStream(normalize, "*.nar");
        try {
            for (Path path : newDirectoryStream) {
                try {
                    EntryFilterDefinition entryFilterDefinition = getEntryFilterDefinition(loadNarClassLoader(path));
                    log.info("Found entry filter from {} : {}", path, entryFilterDefinition);
                    Preconditions.checkArgument(StringUtils.isNotBlank(entryFilterDefinition.getName()));
                    Preconditions.checkArgument(StringUtils.isNotBlank(entryFilterDefinition.getEntryFilterClass()));
                    EntryFilterMetaData entryFilterMetaData = new EntryFilterMetaData();
                    entryFilterMetaData.setDefinition(entryFilterDefinition);
                    entryFilterMetaData.setArchivePath(path);
                    hashMap.put(entryFilterDefinition.getName(), entryFilterMetaData);
                } catch (Throwable th) {
                    log.warn("Failed to load entry filters from {}. It is OK however if you want to use this entry filters, please make sure you put the correct entry filter NAR package in the entry filter directory.", path, th);
                }
            }
            if (newDirectoryStream != null) {
                newDirectoryStream.close();
            }
            this.definitions = Collections.unmodifiableMap(hashMap);
            this.cachedClassLoaders = Collections.unmodifiableMap(this.cachedClassLoaders);
        } catch (Throwable th2) {
            if (newDirectoryStream != null) {
                try {
                    newDirectoryStream.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    @VisibleForTesting
    static EntryFilterDefinition getEntryFilterDefinition(NarClassLoader narClassLoader) throws IOException {
        String serviceDefinition;
        try {
            serviceDefinition = narClassLoader.getServiceDefinition("entry_filter.yaml");
        } catch (NoSuchFileException e) {
            serviceDefinition = narClassLoader.getServiceDefinition("entry_filter.yml");
        }
        return (EntryFilterDefinition) ObjectMapperFactory.getYamlMapper().reader().readValue(serviceDefinition, EntryFilterDefinition.class);
    }

    protected EntryFilter load(EntryFilterMetaData entryFilterMetaData) throws IOException {
        EntryFilterDefinition definition = entryFilterMetaData.getDefinition();
        if (StringUtils.isBlank(definition.getEntryFilterClass())) {
            throw new RuntimeException("Entry filter `" + definition.getName() + "` does NOT provide a entry filters implementation");
        }
        try {
            NarClassLoader narClassLoader = getNarClassLoader(entryFilterMetaData.getArchivePath());
            if (narClassLoader == null) {
                throw new RuntimeException("Entry filter `" + definition.getName() + "` cannot be loaded, see the broker logs for further details");
            }
            Object newInstance = narClassLoader.loadClass(definition.getEntryFilterClass()).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            if (newInstance instanceof EntryFilter) {
                return new EntryFilterWithClassLoader((EntryFilter) newInstance, narClassLoader, false);
            }
            throw new IOException("Class " + definition.getEntryFilterClass() + " does not implement entry filter interface");
        } catch (Throwable th) {
            if (th instanceof IOException) {
                throw ((IOException) th);
            }
            log.error("Failed to load class {}", entryFilterMetaData.getDefinition().getEntryFilterClass(), th);
            throw new IOException(th);
        }
    }

    private NarClassLoader getNarClassLoader(Path path) {
        return this.cachedClassLoaders.get(classLoaderKey(path));
    }

    private NarClassLoader loadNarClassLoader(Path path) {
        return this.cachedClassLoaders.computeIfAbsent(classLoaderKey(path), str -> {
            try {
                return NarClassLoaderBuilder.builder().narFile(path.toAbsolutePath().normalize().toFile()).parentClassLoader(EntryFilter.class.getClassLoader()).extractionDirectory(this.serviceConfiguration.getNarExtractionDirectory()).build();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private static String classLoaderKey(Path path) {
        return path.toString();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.brokerEntryFilters.forEach(entryFilter -> {
            try {
                entryFilter.close();
            } catch (Throwable th) {
                log.warn("Error shutting down entry filter {}", entryFilter, th);
            }
        });
        this.cachedClassLoaders.forEach((str, narClassLoader) -> {
            try {
                narClassLoader.close();
            } catch (Throwable th) {
                log.warn("Error closing entry filter class loader {}", str, th);
            }
        });
    }
}
