package org.apache.flume.channel.file;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.LongBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:META-INF/bundled-dependencies/flume-file-channel-1.11.0.jar:org/apache/flume/channel/file/EventQueueBackingStoreFile.class */
public abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) EventQueueBackingStoreFile.class);
    private static final int MAX_ALLOC_BUFFER_SIZE = 2097152;
    protected static final int HEADER_SIZE = 1029;
    protected static final int INDEX_VERSION = 0;
    protected static final int INDEX_WRITE_ORDER_ID = 1;
    protected static final int INDEX_CHECKPOINT_MARKER = 4;
    protected static final int CHECKPOINT_COMPLETE = 0;
    protected static final int CHECKPOINT_INCOMPLETE = 1;
    protected static final String COMPRESSED_FILE_EXTENSION = ".snappy";
    protected LongBuffer elementsBuffer;
    protected final Map<Integer, Long> overwriteMap;
    protected final Map<Integer, AtomicInteger> logFileIDReferenceCounts;
    protected final MappedByteBuffer mappedBuffer;
    protected final RandomAccessFile checkpointFileHandle;
    private final FileChannelCounter fileChannelCounter;
    protected final File checkpointFile;
    private final Semaphore backupCompletedSema;
    protected final boolean shouldBackup;
    protected final boolean compressBackup;
    private final File backupDir;
    private final ExecutorService checkpointBackUpExecutor;

    /* JADX INFO: Access modifiers changed from: protected */
    public EventQueueBackingStoreFile(int i, String str, FileChannelCounter fileChannelCounter, File file) throws IOException, BadCheckpointException {
        this(i, str, fileChannelCounter, file, null, false, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public EventQueueBackingStoreFile(int i, String str, FileChannelCounter fileChannelCounter, File file, File file2, boolean z, boolean z2) throws IOException, BadCheckpointException {
        super(i, str);
        this.overwriteMap = new HashMap();
        this.logFileIDReferenceCounts = Maps.newHashMap();
        this.backupCompletedSema = new Semaphore(1);
        this.fileChannelCounter = fileChannelCounter;
        this.checkpointFile = file;
        this.shouldBackup = z;
        this.compressBackup = z2;
        this.backupDir = file2;
        this.checkpointFileHandle = new RandomAccessFile(file, "rw");
        long j = (i + HEADER_SIZE) * 8;
        if (this.checkpointFileHandle.length() == 0) {
            allocate(file, j);
            this.checkpointFileHandle.seek(0L);
            this.checkpointFileHandle.writeLong(getVersion());
            this.checkpointFileHandle.getChannel().force(true);
            LOG.info("Preallocated " + file + " to " + this.checkpointFileHandle.length() + " for capacity " + i);
        }
        if (file.length() != j) {
            throw new BadCheckpointException("Configured capacity is " + i + " but the  checkpoint file capacity is " + ((file.length() / 8) - 1029) + ". See FileChannel documentation on how to change a channels capacity.");
        }
        this.mappedBuffer = this.checkpointFileHandle.getChannel().map(FileChannel.MapMode.READ_WRITE, 0L, file.length());
        this.elementsBuffer = this.mappedBuffer.asLongBuffer();
        long j2 = this.elementsBuffer.get(0);
        if (j2 != getVersion()) {
            throw new BadCheckpointException("Invalid version: " + j2 + " " + str + ", expected " + getVersion());
        }
        if (this.elementsBuffer.get(4) != 0) {
            throw new BadCheckpointException("Checkpoint was not completed correctly, probably because the agent stopped while the channel was checkpointing.");
        }
        if (this.shouldBackup) {
            this.checkpointBackUpExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat(getName() + " - CheckpointBackUpThread").build());
        } else {
            this.checkpointBackUpExecutor = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getCheckpointLogWriteOrderID() {
        return this.elementsBuffer.get(1);
    }

    protected abstract void writeCheckpointMetaData() throws IOException;

    protected void backupCheckpoint(File file) throws IOException {
        int drainPermits = this.backupCompletedSema.drainPermits();
        Preconditions.checkState(drainPermits == 0, "Expected no permits to be available in the backup semaphore, but " + drainPermits + " permits were available.");
        if (this.slowdownBackup.booleanValue()) {
            try {
                TimeUnit.SECONDS.sleep(10L);
            } catch (Exception e) {
                Throwables.propagate(e);
            }
        }
        File file2 = new File(file, EventQueueBackingStore.BACKUP_COMPLETE_FILENAME);
        if (backupExists(file) && !file2.delete()) {
            throw new IOException("Error while doing backup of checkpoint. Could not remove" + file2.toString() + ".");
        }
        Serialization.deleteAllFiles(file, Log.EXCLUDES);
        File[] listFiles = this.checkpointFile.getParentFile().listFiles();
        Preconditions.checkNotNull(listFiles, "Could not retrieve files from the checkpoint directory. Cannot complete backup of the checkpoint.");
        for (File file3 : listFiles) {
            if (!Log.EXCLUDES.contains(file3.getName())) {
                if (this.compressBackup && file3.equals(this.checkpointFile)) {
                    Serialization.compressFile(file3, new File(file, file3.getName() + COMPRESSED_FILE_EXTENSION));
                } else {
                    Serialization.copyFile(file3, new File(file, file3.getName()));
                }
            }
        }
        Preconditions.checkState(!file2.exists(), "The backup file exists while it is not supposed to. Are multiple channels configured to use this directory: " + file.toString() + " as backup?");
        if (file2.createNewFile()) {
            return;
        }
        LOG.error("Could not create backup file. Backup of checkpoint will not be used during replay even if checkpoint is bad.");
    }

    public static boolean restoreBackup(File file, File file2) throws IOException {
        if (!backupExists(file2)) {
            return false;
        }
        Serialization.deleteAllFiles(file, Log.EXCLUDES);
        File[] listFiles = file2.listFiles();
        if (listFiles == null) {
            return false;
        }
        for (File file3 : listFiles) {
            String name = file3.getName();
            if (!name.equals(EventQueueBackingStore.BACKUP_COMPLETE_FILENAME) && !name.equals(Log.FILE_LOCK)) {
                if (name.endsWith(COMPRESSED_FILE_EXTENSION)) {
                    Serialization.decompressFile(file3, new File(file, name.substring(0, name.lastIndexOf("."))));
                } else {
                    Serialization.copyFile(file3, new File(file, name));
                }
            }
        }
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flume.channel.file.EventQueueBackingStore
    public void beginCheckpoint() throws IOException {
        LOG.info("Start checkpoint for " + this.checkpointFile + ", elements to sync = " + this.overwriteMap.size());
        if (this.shouldBackup) {
            int drainPermits = this.backupCompletedSema.drainPermits();
            Preconditions.checkState(drainPermits <= 1, "Expected only one or less permits to checkpoint, but got " + String.valueOf(drainPermits) + " permits");
            if (drainPermits < 1) {
                throw new IOException("Previous backup of checkpoint files is still in progress. Will attempt to checkpoint only at the end of the next checkpoint interval. Try increasing the checkpoint interval if this error happens often.");
            }
        }
        this.elementsBuffer.put(4, 1L);
        this.mappedBuffer.force();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flume.channel.file.EventQueueBackingStore
    public void checkpoint() throws IOException {
        setLogWriteOrderID(WriteOrderOracle.next());
        LOG.info("Updating checkpoint metadata: logWriteOrderID: " + getLogWriteOrderID() + ", queueSize: " + getSize() + ", queueHead: " + getHead());
        this.elementsBuffer.put(1, getLogWriteOrderID());
        try {
            writeCheckpointMetaData();
            Iterator<Integer> it = this.overwriteMap.keySet().iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                this.elementsBuffer.put(intValue, this.overwriteMap.get(Integer.valueOf(intValue)).longValue());
                it.remove();
            }
            Preconditions.checkState(this.overwriteMap.isEmpty(), "concurrent update detected ");
            this.elementsBuffer.put(4, 0L);
            this.mappedBuffer.force();
            if (this.shouldBackup) {
                startBackupThread();
            }
        } catch (IOException e) {
            throw new IOException("Error writing metadata", e);
        }
    }

    private void startBackupThread() {
        Preconditions.checkNotNull(this.checkpointBackUpExecutor, "Expected the checkpoint backup exector to be non-null, but it is null. Checkpoint will not be backed up.");
        LOG.info("Attempting to back up checkpoint.");
        this.checkpointBackUpExecutor.submit(new Runnable() { // from class: org.apache.flume.channel.file.EventQueueBackingStoreFile.1
            @Override // java.lang.Runnable
            public void run() {
                boolean z = false;
                try {
                    EventQueueBackingStoreFile.this.backupCheckpoint(EventQueueBackingStoreFile.this.backupDir);
                } catch (Throwable th) {
                    EventQueueBackingStoreFile.this.fileChannelCounter.incrementCheckpointBackupWriteErrorCount();
                    z = true;
                    EventQueueBackingStoreFile.LOG.error("Backing up of checkpoint directory failed.", th);
                } finally {
                    EventQueueBackingStoreFile.this.backupCompletedSema.release();
                }
                if (z) {
                    return;
                }
                EventQueueBackingStoreFile.LOG.info("Checkpoint backup completed.");
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flume.channel.file.EventQueueBackingStore
    public void close() {
        this.mappedBuffer.force();
        try {
            this.checkpointFileHandle.close();
        } catch (IOException e) {
            LOG.info("Error closing " + this.checkpointFile, (Throwable) e);
        }
        if (this.checkpointBackUpExecutor == null || this.checkpointBackUpExecutor.isShutdown()) {
            return;
        }
        this.checkpointBackUpExecutor.shutdown();
        do {
            try {
            } catch (InterruptedException e2) {
                LOG.warn("Interrupted while waiting for checkpoint backup to complete");
                return;
            }
        } while (!this.checkpointBackUpExecutor.awaitTermination(1L, TimeUnit.SECONDS));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flume.channel.file.EventQueueBackingStore
    public long get(int i) {
        int physicalIndex = getPhysicalIndex(i);
        return this.overwriteMap.containsKey(Integer.valueOf(physicalIndex)) ? this.overwriteMap.get(Integer.valueOf(physicalIndex)).longValue() : this.elementsBuffer.get(physicalIndex);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flume.channel.file.EventQueueBackingStore
    public ImmutableSortedSet<Integer> getReferenceCounts() {
        return ImmutableSortedSet.copyOf((Collection) this.logFileIDReferenceCounts.keySet());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flume.channel.file.EventQueueBackingStore
    public void put(int i, long j) {
        this.overwriteMap.put(Integer.valueOf(getPhysicalIndex(i)), Long.valueOf(j));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flume.channel.file.EventQueueBackingStore
    public boolean syncRequired() {
        return this.overwriteMap.size() > 0;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flume.channel.file.EventQueueBackingStore
    public void incrementFileID(int i) {
        AtomicInteger atomicInteger = this.logFileIDReferenceCounts.get(Integer.valueOf(i));
        if (atomicInteger == null) {
            atomicInteger = new AtomicInteger(0);
            this.logFileIDReferenceCounts.put(Integer.valueOf(i), atomicInteger);
        }
        atomicInteger.incrementAndGet();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flume.channel.file.EventQueueBackingStore
    public void decrementFileID(int i) {
        AtomicInteger atomicInteger = this.logFileIDReferenceCounts.get(Integer.valueOf(i));
        Preconditions.checkState(atomicInteger != null, "null counter ");
        if (atomicInteger.decrementAndGet() == 0) {
            this.logFileIDReferenceCounts.remove(Integer.valueOf(i));
        }
    }

    protected int getPhysicalIndex(int i) {
        return HEADER_SIZE + ((getHead() + i) % getCapacity());
    }

    protected static void allocate(File file, long j) throws IOException {
        RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
        try {
            if (j <= 2097152) {
                randomAccessFile.write(new byte[(int) j]);
            } else {
                byte[] bArr = new byte[2097152];
                long j2 = j;
                while (j2 >= 2097152) {
                    randomAccessFile.write(bArr);
                    j2 -= 2097152;
                }
                if (j2 > 0) {
                    randomAccessFile.write(bArr, 0, (int) j2);
                }
            }
            try {
                randomAccessFile.close();
            } catch (IOException e) {
                if (1 != 0) {
                    throw e;
                }
            }
        } catch (Throwable th) {
            try {
                randomAccessFile.close();
            } catch (IOException e2) {
                if (0 != 0) {
                    throw e2;
                }
            }
            throw th;
        }
    }

    public static boolean backupExists(File file) {
        return new File(file, EventQueueBackingStore.BACKUP_COMPLETE_FILENAME).exists();
    }

    public static void main(String[] strArr) throws Exception {
        File file = new File(strArr[0]);
        File file2 = new File(strArr[1]);
        File file3 = new File(strArr[2]);
        File file4 = new File(strArr[3]);
        if (!file.exists()) {
            throw new IOException("File " + file + " does not exist");
        }
        if (file.length() == 0) {
            throw new IOException("File " + file + " is empty");
        }
        EventQueueBackingStoreFile eventQueueBackingStoreFile = (EventQueueBackingStoreFile) EventQueueBackingStoreFactory.get(file, (int) ((file.length() - 8232) / 8), "debug", new FileChannelCounter("Main"), false);
        System.out.println("File Reference Counts" + eventQueueBackingStoreFile.logFileIDReferenceCounts);
        System.out.println("Queue Capacity " + eventQueueBackingStoreFile.getCapacity());
        System.out.println("Queue Size " + eventQueueBackingStoreFile.getSize());
        System.out.println("Queue Head " + eventQueueBackingStoreFile.getHead());
        for (int i = 0; i < eventQueueBackingStoreFile.getCapacity(); i++) {
            long j = eventQueueBackingStoreFile.get(eventQueueBackingStoreFile.getPhysicalIndex(i));
            System.out.println(i + ":" + Long.toHexString(j) + " fileID = " + ((int) (j >>> 32)) + ", offset = " + ((int) j));
        }
        FlumeEventQueue flumeEventQueue = new FlumeEventQueue(eventQueueBackingStoreFile, file2, file3, file4);
        SetMultimap<Long, Long> deserializeInflightPuts = flumeEventQueue.deserializeInflightPuts();
        System.out.println("Inflight Puts:");
        for (Long l : deserializeInflightPuts.keySet()) {
            Set<Long> set = deserializeInflightPuts.get((SetMultimap<Long, Long>) l);
            System.out.println("Transaction ID: " + String.valueOf(l));
            Iterator<Long> it = set.iterator();
            while (it.hasNext()) {
                long longValue = it.next().longValue();
                System.out.println(Long.toHexString(longValue) + " fileID = " + ((int) (longValue >>> 32)) + ", offset = " + ((int) longValue));
            }
        }
        SetMultimap<Long, Long> deserializeInflightTakes = flumeEventQueue.deserializeInflightTakes();
        System.out.println("Inflight takes:");
        for (Long l2 : deserializeInflightTakes.keySet()) {
            Set<Long> set2 = deserializeInflightTakes.get((SetMultimap<Long, Long>) l2);
            System.out.println("Transaction ID: " + String.valueOf(l2));
            Iterator<Long> it2 = set2.iterator();
            while (it2.hasNext()) {
                long longValue2 = it2.next().longValue();
                System.out.println(Long.toHexString(longValue2) + " fileID = " + ((int) (longValue2 >>> 32)) + ", offset = " + ((int) longValue2));
            }
        }
    }
}
