package io.streamthoughts.kafka.connect.filepulse.state;

import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
import io.streamthoughts.kafka.connect.filepulse.state.internal.OpaqueMemoryResource;
import io.streamthoughts.kafka.connect.filepulse.state.internal.ResourceInitializer;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import java.util.HashSet;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/state/FileObjectStateBackingStoreManager.class */
public final class FileObjectStateBackingStoreManager {
    public static final FileObjectStateBackingStoreManager INSTANCE = new FileObjectStateBackingStoreManager();
    private static final Logger LOG = LoggerFactory.getLogger(FileObjectStateBackingStoreManager.class);
    private final ReentrantLock lock = new ReentrantLock();
    private final ConcurrentHashMap<String, LeasedResource<StateBackingStore<FileObject>>> leasedResources = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/streamthoughts/kafka/connect/filepulse/state/FileObjectStateBackingStoreManager$LeasedResource.class */
    public static final class LeasedResource<T extends AutoCloseable> implements AutoCloseable {
        private final T resource;
        private final HashSet<Object> leaseHolders = new HashSet<>();
        private final AtomicBoolean closed = new AtomicBoolean(false);

        public LeasedResource(T t) {
            this.resource = (T) Objects.requireNonNull(t, "resource should not be null");
        }

        public T getResource() {
            return this.resource;
        }

        void addLeaseHolder(Object obj) {
            this.leaseHolders.add(obj);
        }

        boolean removeLeaseHolder(Object obj) {
            this.leaseHolders.remove(obj);
            return this.leaseHolders.isEmpty();
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            if (this.closed.compareAndSet(false, true)) {
                this.resource.close();
            }
        }
    }

    public OpaqueMemoryResource<StateBackingStore<FileObject>> getOrCreateSharedStore(String str, ResourceInitializer<StateBackingStore<FileObject>> resourceInitializer, Object obj) throws Exception {
        this.lock.lock();
        try {
            LeasedResource<StateBackingStore<FileObject>> leasedResource = this.leasedResources.get(str);
            if (leasedResource == null) {
                LOG.info("Initializing shared StateBackingStore '{}'", str);
                leasedResource = new LeasedResource<>(resourceInitializer.apply());
                this.leasedResources.put(str, leasedResource);
            }
            leasedResource.addLeaseHolder(obj);
            OpaqueMemoryResource<StateBackingStore<FileObject>> opaqueMemoryResource = new OpaqueMemoryResource<>(leasedResource.getResource(), () -> {
                release(str, obj);
            });
            this.lock.unlock();
            return opaqueMemoryResource;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    void release(String str, Object obj) throws Exception {
        this.lock.lock();
        try {
            LeasedResource<StateBackingStore<FileObject>> leasedResource = this.leasedResources.get(str);
            if (leasedResource == null) {
                return;
            }
            String name = Thread.currentThread().getName();
            LOG.info("[{}] Releasing access on shared StateBackingStore '{}' instance.", str, name);
            if (leasedResource.removeLeaseHolder(obj)) {
                LOG.info("[{}] Closing shared StateBackingStore '{}'", str, name);
                leasedResource.close();
                this.leasedResources.remove(str);
            }
            this.lock.unlock();
        } finally {
            this.lock.unlock();
        }
    }
}
