package org.apache.hadoop.hbase.wal;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiPredicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.regionserver.PeerActionListener;
import org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.Streams;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.class */
public class SyncReplicationWALProvider implements WALProvider, PeerActionListener {
    public static final String DUAL_WAL_IMPL = "hbase.wal.sync.impl";
    private final WALProvider provider;
    private WALFactory factory;
    private Configuration conf;
    private EventLoopGroup eventLoopGroup;
    private Class<? extends Channel> channelClass;
    private static final Logger LOG = LoggerFactory.getLogger(SyncReplicationWALProvider.class);
    private static final Pattern LOG_PREFIX_PATTERN = Pattern.compile(".*-\\d+-(.+)");
    private SyncReplicationPeerInfoProvider peerInfoProvider = new DefaultSyncReplicationPeerInfoProvider();
    private List<WALActionsListener> listeners = new ArrayList();
    private AtomicBoolean initialized = new AtomicBoolean(false);
    private final ConcurrentMap<String, Optional<DualAsyncFSWAL>> peerId2WAL = new ConcurrentHashMap();
    private final KeyLocker<String> createLock = new KeyLocker<>();

    /* loaded from: input_file:org/apache/hadoop/hbase/wal/SyncReplicationWALProvider$DefaultSyncReplicationPeerInfoProvider.class */
    private static class DefaultSyncReplicationPeerInfoProvider implements SyncReplicationPeerInfoProvider {
        private DefaultSyncReplicationPeerInfoProvider() {
        }

        @Override // org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider
        public Optional<Pair<String, String>> getPeerIdAndRemoteWALDir(TableName tableName) {
            return Optional.empty();
        }

        @Override // org.apache.hadoop.hbase.replication.regionserver.SyncReplicationPeerInfoProvider
        public boolean checkState(TableName tableName, BiPredicate<SyncReplicationState, SyncReplicationState> biPredicate) {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SyncReplicationWALProvider(WALProvider wALProvider) {
        this.provider = wALProvider;
    }

    public void setPeerInfoProvider(SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider) {
        this.peerInfoProvider = syncReplicationPeerInfoProvider;
    }

    @Override // org.apache.hadoop.hbase.wal.WALProvider
    public void init(WALFactory wALFactory, Configuration configuration, String str, Abortable abortable) throws IOException {
        if (!this.initialized.compareAndSet(false, true)) {
            throw new IllegalStateException("WALProvider.init should only be called once.");
        }
        this.provider.init(wALFactory, configuration, str, abortable);
        this.conf = configuration;
        this.factory = wALFactory;
        Pair<EventLoopGroup, Class<? extends Channel>> eventLoopConfig = NettyAsyncFSWALConfigHelper.getEventLoopConfig(configuration);
        this.eventLoopGroup = (EventLoopGroup) eventLoopConfig.getFirst();
        this.channelClass = (Class) eventLoopConfig.getSecond();
    }

    private String getLogPrefix(String str) {
        return this.factory.factoryId + "-" + EnvironmentEdgeManager.currentTime() + "-" + str;
    }

    private DualAsyncFSWAL createWAL(String str, String str2) throws IOException {
        Class cls = this.conf.getClass(DUAL_WAL_IMPL, DualAsyncFSWAL.class, DualAsyncFSWAL.class);
        try {
            Constructor<?> constructor = null;
            Constructor<?>[] declaredConstructors = cls.getDeclaredConstructors();
            int length = declaredConstructors.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                Constructor<?> constructor2 = declaredConstructors[i];
                if (constructor2.getParameterCount() > 0) {
                    constructor = constructor2;
                    break;
                }
                i++;
            }
            if (constructor == null) {
                throw new IllegalArgumentException("No valid constructor provided for class " + cls);
            }
            constructor.setAccessible(true);
            return (DualAsyncFSWAL) constructor.newInstance(CommonFSUtils.getWALFileSystem(this.conf), ReplicationUtils.getRemoteWALFileSystem(this.conf, str2), CommonFSUtils.getWALRootDir(this.conf), ReplicationUtils.getPeerRemoteWALDir(str2, str), AbstractFSWALProvider.getWALDirectoryName(this.factory.factoryId), AbstractFSWALProvider.getWALArchiveDirectoryName(this.conf, this.factory.factoryId), this.conf, this.listeners, true, getLogPrefix(str), ".syncrep", this.eventLoopGroup, this.channelClass);
        } catch (IllegalAccessException | InstantiationException e) {
            throw new RuntimeException(e);
        } catch (InvocationTargetException e2) {
            Throwable targetException = e2.getTargetException();
            Throwables.propagateIfPossible(targetException, IOException.class);
            throw new RuntimeException(targetException);
        }
    }

    private DualAsyncFSWAL getWAL(String str, String str2) throws IOException {
        Optional<DualAsyncFSWAL> optional = this.peerId2WAL.get(str);
        if (optional != null) {
            return optional.orElse(null);
        }
        ReentrantLock acquireLock = this.createLock.acquireLock(str);
        try {
            Optional<DualAsyncFSWAL> optional2 = this.peerId2WAL.get(str);
            if (optional2 != null) {
                DualAsyncFSWAL orElse = optional2.orElse(null);
                acquireLock.unlock();
                return orElse;
            }
            DualAsyncFSWAL createWAL = createWAL(str, str2);
            boolean z = false;
            try {
                createWAL.init();
                z = true;
                if (1 == 0) {
                    createWAL.close();
                }
                this.peerId2WAL.put(str, Optional.of(createWAL));
                acquireLock.unlock();
                return createWAL;
            } catch (Throwable th) {
                if (!z) {
                    createWAL.close();
                }
                throw th;
            }
        } catch (Throwable th2) {
            acquireLock.unlock();
            throw th2;
        }
    }

    @Override // org.apache.hadoop.hbase.wal.WALProvider
    public WAL getWAL(RegionInfo regionInfo) throws IOException {
        if (regionInfo == null) {
            return this.provider.getWAL(null);
        }
        DualAsyncFSWAL dualAsyncFSWAL = null;
        Optional<Pair<String, String>> peerIdAndRemoteWALDir = this.peerInfoProvider.getPeerIdAndRemoteWALDir(regionInfo.getTable());
        if (peerIdAndRemoteWALDir.isPresent()) {
            Pair<String, String> pair = peerIdAndRemoteWALDir.get();
            dualAsyncFSWAL = getWAL((String) pair.getFirst(), (String) pair.getSecond());
        }
        return dualAsyncFSWAL != null ? dualAsyncFSWAL : this.provider.getWAL(regionInfo);
    }

    private Stream<WAL> getWALStream() {
        return Streams.concat(new Stream[]{this.peerId2WAL.values().stream().filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }), this.provider.getWALs().stream()});
    }

    @Override // org.apache.hadoop.hbase.wal.WALProvider
    public List<WAL> getWALs() {
        return (List) getWALStream().collect(Collectors.toList());
    }

    @Override // org.apache.hadoop.hbase.wal.WALProvider
    public void shutdown() throws IOException {
        IOException iOException = null;
        for (Optional<DualAsyncFSWAL> optional : this.peerId2WAL.values()) {
            if (optional.isPresent()) {
                try {
                    optional.get().shutdown();
                } catch (IOException e) {
                    LOG.error("Shutdown WAL failed", e);
                    iOException = e;
                }
            }
        }
        this.provider.shutdown();
        if (iOException != null) {
            throw iOException;
        }
    }

    @Override // org.apache.hadoop.hbase.wal.WALProvider
    public void close() throws IOException {
        IOException iOException = null;
        for (Optional<DualAsyncFSWAL> optional : this.peerId2WAL.values()) {
            if (optional.isPresent()) {
                try {
                    optional.get().close();
                } catch (IOException e) {
                    LOG.error("Close WAL failed", e);
                    iOException = e;
                }
            }
        }
        this.provider.close();
        if (iOException != null) {
            throw iOException;
        }
    }

    @Override // org.apache.hadoop.hbase.wal.WALProvider
    public long getNumLogFiles() {
        return this.peerId2WAL.size() + this.provider.getNumLogFiles();
    }

    @Override // org.apache.hadoop.hbase.wal.WALProvider
    public long getLogFileSize() {
        return this.peerId2WAL.values().stream().filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).mapToLong((v0) -> {
            return v0.getLogFileSize();
        }).sum() + this.provider.getLogFileSize();
    }

    private void safeClose(WAL wal) {
        if (wal != null) {
            try {
                wal.close();
            } catch (IOException e) {
                LOG.error("Close WAL failed", e);
            }
        }
    }

    @Override // org.apache.hadoop.hbase.wal.WALProvider
    public void addWALActionsListener(WALActionsListener wALActionsListener) {
        this.listeners.add(wALActionsListener);
        this.provider.addWALActionsListener(wALActionsListener);
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.PeerActionListener
    public void peerSyncReplicationStateChange(String str, SyncReplicationState syncReplicationState, SyncReplicationState syncReplicationState2, int i) {
        if (syncReplicationState == SyncReplicationState.ACTIVE) {
            if (i != 0) {
                if (i == 1) {
                    this.peerId2WAL.remove(str).ifPresent((v1) -> {
                        safeClose(v1);
                    });
                    return;
                }
                return;
            }
            ReentrantLock acquireLock = this.createLock.acquireLock(str);
            try {
                Optional<DualAsyncFSWAL> optional = this.peerId2WAL.get(str);
                if (optional != null) {
                    optional.ifPresent(dualAsyncFSWAL -> {
                        dualAsyncFSWAL.skipRemoteWAL(syncReplicationState2 == SyncReplicationState.STANDBY);
                    });
                } else {
                    this.peerId2WAL.put(str, Optional.empty());
                }
            } finally {
                acquireLock.unlock();
            }
        }
    }

    public static Optional<String> getSyncReplicationPeerIdFromWALName(String str) {
        if (!str.endsWith(".syncrep")) {
            return Optional.empty();
        }
        Matcher matcher = LOG_PREFIX_PATTERN.matcher(AbstractFSWALProvider.getWALPrefixFromWALName(str));
        return matcher.matches() ? Optional.of(matcher.group(1)) : Optional.empty();
    }

    WALProvider getWrappedProvider() {
        return this.provider;
    }
}
