package org.apache.flume.channel.file;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.collections.map.MultiValueMap;
import org.apache.flume.channel.file.LogFile;
import org.apache.flume.channel.file.TransactionEventRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/channel/file/ReplayHandler.class */
class ReplayHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ReplayHandler.class);
    private final FlumeEventQueue queue;
    private final long lastCheckpoint;
    private final List<Long> pendingTakes = Lists.newArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReplayHandler(FlumeEventQueue flumeEventQueue) {
        this.queue = flumeEventQueue;
        this.lastCheckpoint = flumeEventQueue.getTimestamp();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Finally extract failed */
    public void replayLog(List<File> list) throws IOException {
        int i = 0;
        int i2 = 0;
        MultiValueMap multiValueMap = new MultiValueMap();
        LOG.info("Starting replay of " + list);
        for (File file : list) {
            LOG.info("Replaying " + file);
            LogFile.SequentialReader sequentialReader = null;
            try {
                try {
                    sequentialReader = new LogFile.SequentialReader(file);
                    sequentialReader.skipToLastCheckpointPosition(this.queue.getTimestamp());
                    int logFileID = sequentialReader.getLogFileID();
                    int i3 = 0;
                    int i4 = 0;
                    int i5 = 0;
                    int i6 = 0;
                    int i7 = 0;
                    int i8 = 0;
                    while (true) {
                        Pair<Integer, TransactionEventRecord> next = sequentialReader.next();
                        if (next == null) {
                            break;
                        }
                        int intValue = next.getLeft().intValue();
                        TransactionEventRecord right = next.getRight();
                        short recordType = right.getRecordType();
                        long transactionID = right.getTransactionID();
                        i3++;
                        if (right.getTimestamp() <= this.lastCheckpoint) {
                            i8++;
                        } else if (recordType == TransactionEventRecord.Type.PUT.get()) {
                            i4++;
                            multiValueMap.put(Long.valueOf(transactionID), new FlumeEventPointer(logFileID, intValue));
                        } else if (recordType == TransactionEventRecord.Type.TAKE.get()) {
                            i5++;
                            Take take = (Take) right;
                            multiValueMap.put(Long.valueOf(transactionID), new FlumeEventPointer(take.getFileID(), take.getOffset()));
                        } else if (recordType == TransactionEventRecord.Type.ROLLBACK.get()) {
                            i6++;
                            multiValueMap.remove(Long.valueOf(transactionID));
                        } else if (recordType == TransactionEventRecord.Type.COMMIT.get()) {
                            i7++;
                            Collection<FlumeEventPointer> collection = (Collection) multiValueMap.remove(Long.valueOf(transactionID));
                            if (collection != null && collection.size() > 0) {
                                processCommit(((Commit) right).getType(), collection);
                                i2 += collection.size();
                            }
                        } else {
                            Preconditions.checkArgument(false, "Unknown record type: " + Integer.toHexString(recordType));
                        }
                    }
                    LOG.info("Replayed " + i2 + " from " + file);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("read: " + i3 + ", put: " + i4 + ", take: " + i5 + ", rollback: " + i6 + ", commit: " + i7 + ", skipp: " + i8);
                    }
                    i += i2;
                    i2 = 0;
                    if (sequentialReader != null) {
                        sequentialReader.close();
                    }
                } catch (EOFException e) {
                    LOG.warn("Hit EOF on " + file);
                    i += i2;
                    i2 = 0;
                    if (sequentialReader != null) {
                        sequentialReader.close();
                    }
                }
            } catch (Throwable th) {
                int i9 = i + i2;
                if (sequentialReader != null) {
                    sequentialReader.close();
                }
                throw th;
            }
        }
        int size = this.pendingTakes.size();
        if (size > 0) {
            String str = "Pending takes " + size + " exist after the end of replay";
            if (LOG.isDebugEnabled()) {
                Iterator<Long> it = this.pendingTakes.iterator();
                while (it.hasNext()) {
                    LOG.debug("Pending take " + FlumeEventPointer.fromLong(it.next().longValue()));
                }
            } else {
                LOG.error(str + ". Duplicate messages will exist in destination.");
            }
        }
        LOG.info("Replayed " + i);
    }

    private void processCommit(short s, Collection<FlumeEventPointer> collection) {
        if (s == TransactionEventRecord.Type.PUT.get()) {
            for (FlumeEventPointer flumeEventPointer : collection) {
                Preconditions.checkState(this.queue.addTail(flumeEventPointer), "Unable to add " + flumeEventPointer);
                if (this.pendingTakes.remove(Long.valueOf(flumeEventPointer.toLong()))) {
                    Preconditions.checkState(this.queue.remove(flumeEventPointer), "Take was pending and pointer was successfully added to the queue but could not be removed: " + flumeEventPointer);
                }
            }
            return;
        }
        if (s != TransactionEventRecord.Type.TAKE.get()) {
            Preconditions.checkArgument(false, "Unknown record type: " + Integer.toHexString(s));
            return;
        }
        for (FlumeEventPointer flumeEventPointer2 : collection) {
            if (!this.queue.remove(flumeEventPointer2)) {
                this.pendingTakes.add(Long.valueOf(flumeEventPointer2.toLong()));
            }
        }
    }
}
