package com.twitter.distributedlog.impl;

import com.google.common.base.Charsets;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.LogSegmentMetadata;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.callback.LogSegmentNamesListener;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.util.DLUtils;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.Transaction;
import com.twitter.distributedlog.zk.DefaultZKOp;
import com.twitter.distributedlog.zk.ZKTransaction;
import com.twitter.distributedlog.zk.ZKVersionedSetOp;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.class */
public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watcher, AsyncCallback.Children2Callback {
    private static final Logger logger;
    final DistributedLogConfiguration conf;
    final int minZKBackoffMs;
    final int maxZKBackoffMs;
    final boolean skipMinVersionCheck;
    final ZooKeeperClient zkc;
    final OrderedScheduler scheduler;
    static final /* synthetic */ boolean $assertionsDisabled;
    boolean closed = false;
    final ConcurrentMap<String, Set<LogSegmentNamesListener>> listeners = new ConcurrentHashMap();
    final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();

    /* renamed from: com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore$1, reason: invalid class name */
    /* loaded from: input_file:com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$zookeeper$Watcher$Event$EventType = new int[Watcher.Event.EventType.values().length];

        static {
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[Watcher.Event.EventType.NodeDeleted.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[Watcher.Event.EventType.NodeChildrenChanged.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore$ReadLogSegmentsTask.class */
    private static class ReadLogSegmentsTask implements Runnable, FutureEventListener<List<String>> {
        private final String logSegmentsPath;
        private final ZKLogSegmentMetadataStore store;
        private int currentZKBackOffMs;

        ReadLogSegmentsTask(String str, ZKLogSegmentMetadataStore zKLogSegmentMetadataStore) {
            this.logSegmentsPath = str;
            this.store = zKLogSegmentMetadataStore;
            this.currentZKBackOffMs = this.store.minZKBackoffMs;
        }

        public void onSuccess(final List<String> list) {
            this.currentZKBackOffMs = this.store.minZKBackoffMs;
            final Set<LogSegmentNamesListener> set = this.store.listeners.get(this.logSegmentsPath);
            if (null != set) {
                this.store.submitTask(this.logSegmentsPath, new Runnable() { // from class: com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore.ReadLogSegmentsTask.1
                    @Override // java.lang.Runnable
                    public void run() {
                        Iterator it = set.iterator();
                        while (it.hasNext()) {
                            ((LogSegmentNamesListener) it.next()).onSegmentsUpdated(list);
                        }
                    }
                });
            }
        }

        public void onFailure(Throwable th) {
            int i = this.store.minZKBackoffMs;
            if (th instanceof KeeperException) {
                if (KeeperException.Code.NONODE == ((KeeperException) th).code()) {
                    this.store.listeners.remove(this.logSegmentsPath);
                    return;
                } else {
                    i = this.currentZKBackOffMs;
                    this.currentZKBackOffMs = Math.min(2 * this.currentZKBackOffMs, this.store.maxZKBackoffMs);
                }
            }
            this.store.scheduleTask(this.logSegmentsPath, this, i);
        }

        @Override // java.lang.Runnable
        public void run() {
            if (null != this.store.listeners.get(this.logSegmentsPath)) {
                this.store.getLogSegmentNames(this.logSegmentsPath, this.store).addEventListener(this);
            } else {
                ZKLogSegmentMetadataStore.logger.debug("Log segments listener for {} has been removed.", this.logSegmentsPath);
            }
        }
    }

    public ZKLogSegmentMetadataStore(DistributedLogConfiguration distributedLogConfiguration, ZooKeeperClient zooKeeperClient, OrderedScheduler orderedScheduler) {
        this.conf = distributedLogConfiguration;
        this.zkc = zooKeeperClient;
        this.scheduler = orderedScheduler;
        this.minZKBackoffMs = distributedLogConfiguration.getZKRetryBackoffStartMillis();
        this.maxZKBackoffMs = distributedLogConfiguration.getZKRetryBackoffMaxMillis();
        this.skipMinVersionCheck = distributedLogConfiguration.getDLLedgerMetadataSkipMinVersionCheck();
    }

    protected void scheduleTask(Object obj, Runnable runnable, long j) {
        this.closeLock.readLock().lock();
        try {
            if (this.closed) {
                return;
            }
            this.scheduler.schedule(obj, runnable, j, TimeUnit.MILLISECONDS);
            this.closeLock.readLock().unlock();
        } finally {
            this.closeLock.readLock().unlock();
        }
    }

    protected void submitTask(Object obj, Runnable runnable) {
        this.closeLock.readLock().lock();
        try {
            if (this.closed) {
                return;
            }
            this.scheduler.submit(obj, runnable);
            this.closeLock.readLock().unlock();
        } finally {
            this.closeLock.readLock().unlock();
        }
    }

    @Override // com.twitter.distributedlog.logsegment.LogSegmentMetadataStore
    public void storeMaxLogSegmentSequenceNumber(Transaction<Object> transaction, String str, Versioned<Long> versioned, Transaction.OpListener<Version> opListener) {
        ZkVersion version = versioned.getVersion();
        if (!$assertionsDisabled && !(version instanceof ZkVersion)) {
            throw new AssertionError();
        }
        transaction.addOp(new ZKVersionedSetOp(Op.setData(str, DLUtils.serializeLogSegmentSequenceNumber(((Long) versioned.getValue()).longValue()), version.getZnodeVersion()), opListener));
    }

    @Override // com.twitter.distributedlog.logsegment.LogSegmentMetadataStore
    public void storeMaxTxnId(Transaction<Object> transaction, String str, Versioned<Long> versioned, Transaction.OpListener<Version> opListener) {
        ZkVersion version = versioned.getVersion();
        if (!$assertionsDisabled && !(version instanceof ZkVersion)) {
            throw new AssertionError();
        }
        transaction.addOp(new ZKVersionedSetOp(Op.setData(str, DLUtils.serializeTransactionId(((Long) versioned.getValue()).longValue()), version.getZnodeVersion()), opListener));
    }

    @Override // com.twitter.distributedlog.logsegment.LogSegmentMetadataStore
    public Transaction<Object> transaction() {
        return new ZKTransaction(this.zkc);
    }

    @Override // com.twitter.distributedlog.logsegment.LogSegmentMetadataStore
    public void createLogSegment(Transaction<Object> transaction, LogSegmentMetadata logSegmentMetadata) {
        transaction.addOp(DefaultZKOp.of(Op.create(logSegmentMetadata.getZkPath(), logSegmentMetadata.getFinalisedData().getBytes(Charsets.UTF_8), this.zkc.getDefaultACL(), CreateMode.PERSISTENT)));
    }

    @Override // com.twitter.distributedlog.logsegment.LogSegmentMetadataStore
    public void deleteLogSegment(Transaction<Object> transaction, LogSegmentMetadata logSegmentMetadata) {
        transaction.addOp(DefaultZKOp.of(Op.delete(logSegmentMetadata.getZkPath(), -1)));
    }

    @Override // com.twitter.distributedlog.logsegment.LogSegmentMetadataStore
    public void updateLogSegment(Transaction<Object> transaction, LogSegmentMetadata logSegmentMetadata) {
        transaction.addOp(DefaultZKOp.of(Op.setData(logSegmentMetadata.getZkPath(), logSegmentMetadata.getFinalisedData().getBytes(Charsets.UTF_8), -1)));
    }

    public void process(WatchedEvent watchedEvent) {
        if (Watcher.Event.EventType.None == watchedEvent.getType() && Watcher.Event.KeeperState.Expired == watchedEvent.getState()) {
            for (String str : new HashSet(this.listeners.keySet())) {
                scheduleTask(str, new ReadLogSegmentsTask(str, this), 0L);
            }
            return;
        }
        String path = watchedEvent.getPath();
        if (null == path) {
            return;
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$zookeeper$Watcher$Event$EventType[watchedEvent.getType().ordinal()]) {
            case 1:
                this.listeners.remove(path);
                return;
            case 2:
                new ReadLogSegmentsTask(path, this).run();
                return;
            default:
                return;
        }
    }

    @Override // com.twitter.distributedlog.logsegment.LogSegmentMetadataStore
    public Future<LogSegmentMetadata> getLogSegment(String str) {
        return LogSegmentMetadata.read(this.zkc, str, this.skipMinVersionCheck);
    }

    @Override // com.twitter.distributedlog.logsegment.LogSegmentMetadataStore
    public Future<List<String>> getLogSegmentNames(String str) {
        return getLogSegmentNames(str, null);
    }

    Future<List<String>> getLogSegmentNames(String str, Watcher watcher) {
        Promise promise = new Promise();
        try {
            this.zkc.get().getChildren(str, watcher, this, promise);
        } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
            promise.setException(FutureUtils.zkException(e, str));
        } catch (InterruptedException e2) {
            promise.setException(FutureUtils.zkException(e2, str));
        }
        return promise;
    }

    public void processResult(int i, String str, Object obj, List<String> list, Stat stat) {
        Promise promise = (Promise) obj;
        if (KeeperException.Code.OK.intValue() == i) {
            promise.setValue(list);
        } else {
            promise.setException(KeeperException.create(KeeperException.Code.get(i)));
        }
    }

    @Override // com.twitter.distributedlog.logsegment.LogSegmentMetadataStore
    public void registerLogSegmentListener(String str, LogSegmentNamesListener logSegmentNamesListener) {
        if (null == logSegmentNamesListener) {
            return;
        }
        this.closeLock.readLock().lock();
        try {
            if (this.closed) {
                return;
            }
            Set<LogSegmentNamesListener> set = this.listeners.get(str);
            if (null == set) {
                HashSet hashSet = new HashSet();
                Set<LogSegmentNamesListener> putIfAbsent = this.listeners.putIfAbsent(str, hashSet);
                set = null != putIfAbsent ? putIfAbsent : hashSet;
            }
            synchronized (set) {
                set.add(logSegmentNamesListener);
                if (!this.listeners.containsKey(str)) {
                    this.listeners.putIfAbsent(str, set);
                }
            }
            new ReadLogSegmentsTask(str, this).run();
            this.closeLock.readLock().unlock();
        } finally {
            this.closeLock.readLock().unlock();
        }
    }

    @Override // com.twitter.distributedlog.logsegment.LogSegmentMetadataStore
    public void unregisterLogSegmentListener(String str, LogSegmentNamesListener logSegmentNamesListener) {
        this.closeLock.readLock().lock();
        try {
            if (this.closed) {
                return;
            }
            Set<LogSegmentNamesListener> set = this.listeners.get(str);
            if (null == set) {
                this.closeLock.readLock().unlock();
                return;
            }
            synchronized (set) {
                set.remove(logSegmentNamesListener);
                if (set.isEmpty()) {
                    this.listeners.remove(str, set);
                }
            }
            this.closeLock.readLock().unlock();
        } finally {
            this.closeLock.readLock().unlock();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closeLock.writeLock().lock();
        try {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.closeLock.writeLock().unlock();
        } finally {
            this.closeLock.writeLock().unlock();
        }
    }

    static {
        $assertionsDisabled = !ZKLogSegmentMetadataStore.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(ZKLogSegmentMetadataStore.class);
    }
}
