package org.apache.hadoop.hbase.replication.regionserver;

import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hbase.thirdparty.com.google.protobuf.ProtocolStringList;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.class */
public class ReplaySyncReplicationWALCallable extends BaseRSProcedureCallable {
    private static final Logger LOG = LoggerFactory.getLogger(ReplaySyncReplicationWALCallable.class);
    private static final String REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE = "hbase.replay.sync.replication.wal.batch.size";
    private static final long DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE = 8388608;
    private String peerId;
    private long batchSize;
    private List<String> wals = new ArrayList();
    private final KeyLocker<String> peersLock = new KeyLocker<>();

    @Override // org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable
    protected void doCall() throws Exception {
        LOG.info("Received a replay sync replication wals {} event, peerId={}", this.wals, this.peerId);
        if (this.rs.getReplicationSinkService() != null) {
            ReentrantLock acquireLock = this.peersLock.acquireLock(this.wals.get(0));
            try {
                Iterator<String> it = this.wals.iterator();
                while (it.hasNext()) {
                    replayWAL(it.next());
                }
            } finally {
                acquireLock.unlock();
            }
        }
    }

    @Override // org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable
    protected void initParameter(byte[] bArr) throws InvalidProtocolBufferException {
        MasterProcedureProtos.ReplaySyncReplicationWALParameter parseFrom = MasterProcedureProtos.ReplaySyncReplicationWALParameter.parseFrom(bArr);
        this.peerId = parseFrom.getPeerId();
        ProtocolStringList walList = parseFrom.getWalList();
        List<String> list = this.wals;
        list.getClass();
        walList.forEach((v1) -> {
            r1.add(v1);
        });
        this.batchSize = this.rs.getConfiguration().getLong(REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE, DEFAULT_REPLAY_SYNC_REPLICATION_WAL_BATCH_SIZE);
    }

    @Override // org.apache.hadoop.hbase.procedure2.RSProcedureCallable
    public EventType getEventType() {
        return EventType.RS_REPLAY_SYNC_REPLICATION_WAL;
    }

    private void replayWAL(String str) throws IOException {
        WAL.Reader reader = getReader(str);
        Throwable th = null;
        try {
            try {
                List<WAL.Entry> readWALEntries = readWALEntries(reader);
                while (!readWALEntries.isEmpty()) {
                    Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> buildReplicateWALEntryRequest = ReplicationProtobufUtil.buildReplicateWALEntryRequest((WAL.Entry[]) readWALEntries.toArray(new WAL.Entry[readWALEntries.size()]));
                    AdminProtos.ReplicateWALEntryRequest replicateWALEntryRequest = (AdminProtos.ReplicateWALEntryRequest) buildReplicateWALEntryRequest.getFirst();
                    this.rs.getReplicationSinkService().replicateLogEntries(replicateWALEntryRequest.getEntryList(), (CellScanner) buildReplicateWALEntryRequest.getSecond(), replicateWALEntryRequest.getReplicationClusterId(), replicateWALEntryRequest.getSourceBaseNamespaceDirPath(), replicateWALEntryRequest.getSourceHFileArchiveDirPath());
                    readWALEntries = readWALEntries(reader);
                }
                if (reader != null) {
                    if (0 == 0) {
                        reader.close();
                        return;
                    }
                    try {
                        reader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (reader != null) {
                if (th != null) {
                    try {
                        reader.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    reader.close();
                }
            }
            throw th4;
        }
    }

    private WAL.Reader getReader(String str) throws IOException {
        Path path = new Path(this.rs.getWALRootDir(), str);
        long len = this.rs.getWALFileSystem().getFileStatus(path).getLen();
        try {
            RecoverLeaseFSUtils.recoverFileLease(this.rs.getWALFileSystem(), path, this.rs.getConfiguration());
            return WALFactory.createReader(this.rs.getWALFileSystem(), path, this.rs.getConfiguration());
        } catch (EOFException e) {
            if (len > 0) {
                throw e;
            }
            LOG.warn("File is empty. Could not open {} for reading because {}", path, e);
            return null;
        }
    }

    private boolean filter(WAL.Entry entry) {
        WALEdit edit = entry.getEdit();
        WALUtil.filterCells(edit, cell -> {
            if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
                return null;
            }
            return cell;
        });
        return !edit.isEmpty();
    }

    private List<WAL.Entry> readWALEntries(WAL.Reader reader) throws IOException {
        ArrayList arrayList = new ArrayList();
        if (reader == null) {
            return arrayList;
        }
        long j = 0;
        while (true) {
            WAL.Entry next = reader.next();
            if (next == null) {
                break;
            }
            if (filter(next)) {
                arrayList.add(next);
                j += next.getEdit().heapSize();
                if (j > this.batchSize) {
                    break;
                }
            }
        }
        return arrayList;
    }
}
