package org.apache.flume.channel.recoverable.memory.wal;

import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flume.channel.recoverable.memory.wal.WALDataFile;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/channel/recoverable/memory/wal/WAL.class */
public class WAL<T extends Writable> implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(WAL.class);
    private File path;
    private File dataPath;
    private File sequenceIDPath;
    private Class<T> clazz;
    private WALDataFile.Writer<T> dataFileWALWriter;
    private WALDataFile.Writer<NullWritable> sequenceIDWALWriter;
    private Map<String, Long> fileLargestSequenceIDMap;
    private AtomicLong largestCommitedSequenceID;
    private volatile boolean rollRequired;
    private volatile boolean rollInProgress;
    private volatile long rollSize;
    private volatile long maxLogsSize;
    private volatile long minLogRetentionPeriod;
    private volatile long workerInterval;
    private int numReplaySequenceIDOverride;
    private Worker backgroundWorker;
    public static final long DEFAULT_ROLL_SIZE = 67108864;
    public static final long DEFAULT_MAX_LOGS_SIZE = 536870912;
    public static final long DEFAULT_MIN_LOG_RETENTION_PERIOD = 300000;
    public static final long DEFAULT_WORKER_INTERVAL = 60000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flume/channel/recoverable/memory/wal/WAL$Worker.class */
    public static class Worker extends Thread {
        private WAL<? extends Writable> wal;
        private volatile boolean run = true;

        public Worker(WAL<? extends Writable> wal) {
            this.wal = wal;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            WAL.LOG.info("Background worker reporting for duty");
            while (this.run) {
                try {
                    try {
                        Thread.sleep(((WAL) this.wal).workerInterval);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    if (this.run) {
                        ArrayList newArrayList = Lists.newArrayList();
                        long j = 0;
                        synchronized (((WAL) this.wal).fileLargestSequenceIDMap) {
                            Iterator it = ((WAL) this.wal).fileLargestSequenceIDMap.keySet().iterator();
                            while (it.hasNext()) {
                                j += new File((String) it.next()).length();
                            }
                            if (j >= ((WAL) this.wal).maxLogsSize) {
                                for (String str : ((WAL) this.wal).fileLargestSequenceIDMap.keySet()) {
                                    File file = new File(str);
                                    Long l = (Long) ((WAL) this.wal).fileLargestSequenceIDMap.get(str);
                                    long j2 = ((WAL) this.wal).largestCommitedSequenceID.get();
                                    if (file.exists() && System.currentTimeMillis() - file.lastModified() > ((WAL) this.wal).minLogRetentionPeriod && j2 > l.longValue()) {
                                        newArrayList.add(str);
                                        WAL.LOG.info("Removing expired file " + str + ", seqid = " + l + ", result = " + file.delete());
                                    }
                                }
                                Iterator it2 = newArrayList.iterator();
                                while (it2.hasNext()) {
                                    ((WAL) this.wal).fileLargestSequenceIDMap.remove((String) it2.next());
                                }
                            }
                        }
                    }
                } catch (Exception e2) {
                    WAL.LOG.error("Uncaught exception in background worker", e2);
                }
            }
            WAL.LOG.warn(getClass().getSimpleName() + " moving on due to stop request");
        }

        public void shutdown() {
            this.run = false;
            interrupt();
        }
    }

    WAL(File file, Class<T> cls) throws IOException {
        this(file, cls, DEFAULT_ROLL_SIZE, DEFAULT_MAX_LOGS_SIZE, DEFAULT_MIN_LOG_RETENTION_PERIOD, DEFAULT_WORKER_INTERVAL);
    }

    public WAL(File file, Class<T> cls, long j, long j2, long j3, long j4) throws IOException {
        this.fileLargestSequenceIDMap = Collections.synchronizedMap(new HashMap());
        this.largestCommitedSequenceID = new AtomicLong(0L);
        this.path = file;
        this.rollSize = j;
        this.maxLogsSize = j2;
        this.minLogRetentionPeriod = j3;
        this.workerInterval = j4;
        StringBuffer stringBuffer = new StringBuffer();
        stringBuffer.append("path = ").append(file).append(", ");
        stringBuffer.append("rollSize = ").append(j).append(", ");
        stringBuffer.append("maxLogsSize = ").append(j2).append(", ");
        stringBuffer.append("minLogRentionPeriod = ").append(j3).append(", ");
        stringBuffer.append("workerInterval = ").append(j4);
        LOG.info("WAL Parameters: " + ((Object) stringBuffer));
        File file2 = new File(file, "clazz");
        createOrDie(file);
        if (file2.exists()) {
            String readFirstLine = Files.readFirstLine(file2, Charsets.UTF_8);
            if (!readFirstLine.equals(cls.getName())) {
                throw new IOException("WAL is for " + readFirstLine + " and you are passing " + cls.getName());
            }
        } else {
            Files.write(cls.getName().getBytes(Charsets.UTF_8), file2);
        }
        this.dataPath = new File(file, "data");
        this.sequenceIDPath = new File(file, "seq");
        createOrDie(this.dataPath);
        createOrDie(this.sequenceIDPath);
        this.clazz = cls;
        this.backgroundWorker = new Worker(this);
        this.backgroundWorker.setName("WAL-Worker-" + file.getAbsolutePath());
        this.backgroundWorker.setDaemon(true);
        this.backgroundWorker.start();
        roll();
    }

    private void roll() throws IOException {
        try {
            this.rollInProgress = true;
            LOG.info("Rolling WAL " + this.path);
            if (this.dataFileWALWriter != null) {
                this.fileLargestSequenceIDMap.put(this.dataFileWALWriter.getPath().getAbsolutePath(), Long.valueOf(this.dataFileWALWriter.getLargestSequenceID()));
                this.dataFileWALWriter.close();
            }
            if (this.sequenceIDWALWriter != null) {
                this.fileLargestSequenceIDMap.put(this.sequenceIDWALWriter.getPath().getAbsolutePath(), Long.valueOf(this.sequenceIDWALWriter.getLargestSequenceID()));
                this.sequenceIDWALWriter.close();
            }
            long currentTimeMillis = System.currentTimeMillis();
            File file = new File(this.dataPath, Long.toString(currentTimeMillis));
            File file2 = new File(this.sequenceIDPath, Long.toString(currentTimeMillis));
            while (true) {
                if (!file.exists() && !file2.exists()) {
                    this.dataFileWALWriter = new WALDataFile.Writer<>(file);
                    this.sequenceIDWALWriter = new WALDataFile.Writer<>(file2);
                    this.rollRequired = false;
                    this.rollInProgress = false;
                    synchronized (this) {
                        notifyAll();
                    }
                    return;
                }
                currentTimeMillis++;
                file = new File(this.dataPath, Long.toString(currentTimeMillis));
                file2 = new File(this.sequenceIDPath, Long.toString(currentTimeMillis));
            }
        } catch (Throwable th) {
            this.rollInProgress = false;
            synchronized (this) {
                notifyAll();
                throw th;
            }
        }
    }

    public WALReplayResult<T> replay() throws IOException {
        final AtomicLong atomicLong = new AtomicLong(0L);
        final HashMap newHashMap = Maps.newHashMap();
        final AtomicLong atomicLong2 = new AtomicLong(0L);
        readFiles(this.sequenceIDPath, new Function<File, Void>() { // from class: org.apache.flume.channel.recoverable.memory.wal.WAL.1
            public Void apply(File file) {
                atomicLong2.addAndGet(file.length());
                return null;
            }
        });
        int baseSize = WALEntry.getBaseSize();
        int max = Math.max(((int) (((float) (atomicLong2.get() / baseSize)) * 1.05f)) + 1, this.numReplaySequenceIDOverride);
        LOG.info("Replay assumptions: baseSize = " + baseSize + ", estimatedNumEntries " + max);
        final SequenceIDBuffer sequenceIDBuffer = new SequenceIDBuffer(max);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        readFiles(this.sequenceIDPath, new Function<File, Void>() { // from class: org.apache.flume.channel.recoverable.memory.wal.WAL.2
            public Void apply(File file) {
                WAL.LOG.info("Replaying " + file);
                WALDataFile.Reader reader = null;
                int i = atomicInteger.get();
                try {
                    try {
                        reader = new WALDataFile.Reader(file, NullWritable.class);
                        long j = Long.MIN_VALUE;
                        while (true) {
                            List<WALEntry<T>> nextBatch = reader.nextBatch();
                            if (nextBatch == null) {
                                break;
                            }
                            Iterator<WALEntry<T>> it = nextBatch.iterator();
                            while (it.hasNext()) {
                                long sequenceID = it.next().getSequenceID();
                                int i2 = i;
                                i++;
                                sequenceIDBuffer.put(i2, sequenceID);
                                j = Math.max(j, sequenceID);
                            }
                        }
                        atomicLong.set(Math.max(j, atomicLong.get()));
                        newHashMap.put(file.getAbsolutePath(), Long.valueOf(j));
                        atomicInteger.set(i);
                        if (reader == null) {
                            return null;
                        }
                        try {
                            reader.close();
                            return null;
                        } catch (IOException e) {
                            return null;
                        }
                    } catch (Throwable th) {
                        atomicInteger.set(i);
                        if (reader != null) {
                            try {
                                reader.close();
                            } catch (IOException e2) {
                            }
                        }
                        throw th;
                    }
                } catch (IOException e3) {
                    Throwables.propagate(e3);
                    atomicInteger.set(i);
                    if (reader == null) {
                        return null;
                    }
                    try {
                        reader.close();
                        return null;
                    } catch (IOException e4) {
                        return null;
                    }
                }
            }
        });
        sequenceIDBuffer.sort();
        final ArrayList newArrayList = Lists.newArrayList();
        final Class<T> cls = this.clazz;
        readFiles(this.dataPath, new Function<File, Void>() { // from class: org.apache.flume.channel.recoverable.memory.wal.WAL.3
            public Void apply(File file) {
                WAL.LOG.info("Replaying " + file);
                WALDataFile.Reader reader = null;
                try {
                    try {
                        reader = new WALDataFile.Reader(file, cls);
                        Lists.newArrayList();
                        long j = Long.MIN_VALUE;
                        while (true) {
                            List<WALEntry<T>> nextBatch = reader.nextBatch();
                            if (nextBatch == null) {
                                break;
                            }
                            for (WALEntry<T> wALEntry : nextBatch) {
                                long sequenceID = wALEntry.getSequenceID();
                                if (!sequenceIDBuffer.exists(sequenceID)) {
                                    newArrayList.add(wALEntry);
                                }
                                j = Math.max(j, sequenceID);
                            }
                        }
                        atomicLong.set(Math.max(j, atomicLong.get()));
                        newHashMap.put(file.getAbsolutePath(), Long.valueOf(j));
                        if (reader == null) {
                            return null;
                        }
                        try {
                            reader.close();
                            return null;
                        } catch (IOException e) {
                            return null;
                        }
                    } catch (Throwable th) {
                        if (reader != null) {
                            try {
                                reader.close();
                            } catch (IOException e2) {
                            }
                        }
                        throw th;
                    }
                } catch (IOException e3) {
                    Throwables.propagate(e3);
                    if (reader == null) {
                        return null;
                    }
                    try {
                        reader.close();
                        return null;
                    } catch (IOException e4) {
                        return null;
                    }
                }
            }
        });
        sequenceIDBuffer.close();
        synchronized (this.fileLargestSequenceIDMap) {
            this.fileLargestSequenceIDMap.clear();
            this.fileLargestSequenceIDMap.putAll(newHashMap);
            LOG.info("SequenceIDMap " + newHashMap);
        }
        this.largestCommitedSequenceID.set(atomicLong.get());
        LOG.info("Replay complete: LargestCommitedSequenceID = " + this.largestCommitedSequenceID.get());
        return new WALReplayResult<>(newArrayList, this.largestCommitedSequenceID.get());
    }

    public void writeEntries(List<WALEntry<T>> list) throws IOException {
        Preconditions.checkNotNull(this.dataFileWALWriter, "Write is null, close must have been called");
        synchronized (this) {
            if (isRollRequired()) {
                roll();
            }
        }
        waitWhileRolling();
        boolean z = true;
        try {
            this.dataFileWALWriter.append(list);
            z = false;
            if (0 != 0) {
                this.rollRequired = true;
            }
        } catch (Throwable th) {
            if (z) {
                this.rollRequired = true;
            }
            throw th;
        }
    }

    public void writeEntry(WALEntry<T> wALEntry) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(wALEntry);
        writeEntries(newArrayList);
    }

    public void writeSequenceID(long j) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(Long.valueOf(j));
        writeSequenceIDs(newArrayList);
    }

    public void writeSequenceIDs(List<Long> list) throws IOException {
        Preconditions.checkNotNull(this.sequenceIDWALWriter, "Write is null, close must have been called");
        synchronized (this) {
            if (isRollRequired()) {
                roll();
            }
        }
        waitWhileRolling();
        try {
            ArrayList newArrayList = Lists.newArrayList();
            for (Long l : list) {
                this.largestCommitedSequenceID.set(Math.max(l.longValue(), this.largestCommitedSequenceID.get()));
                newArrayList.add(new WALEntry(NullWritable.get(), l.longValue()));
                this.sequenceIDWALWriter.append(newArrayList);
            }
            if (0 != 0) {
                this.rollRequired = true;
            }
        } catch (Throwable th) {
            if (1 != 0) {
                this.rollRequired = true;
            }
            throw th;
        }
    }

    private void waitWhileRolling() {
        synchronized (this) {
            while (this.rollInProgress) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.backgroundWorker != null) {
            this.backgroundWorker.shutdown();
        }
        if (this.sequenceIDWALWriter != null) {
            this.sequenceIDWALWriter.close();
            this.sequenceIDWALWriter = null;
        }
        if (this.dataFileWALWriter != null) {
            this.dataFileWALWriter.close();
            this.dataFileWALWriter = null;
        }
    }

    private boolean isRollRequired() throws IOException {
        return this.rollRequired || Math.max(this.dataFileWALWriter.getSize(), this.sequenceIDWALWriter.getSize()) > this.rollSize;
    }

    private void readFiles(File file, Function<File, Void> function) throws IOException {
        File[] listFiles = file.listFiles();
        ArrayList newArrayList = Lists.newArrayList();
        if (listFiles != null) {
            for (File file2 : listFiles) {
                if (!file2.isFile()) {
                    throw new IOException("Not file " + file2);
                }
                newArrayList.add(file2);
            }
        }
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            function.apply((File) it.next());
        }
    }

    private void createOrDie(File file) throws IOException {
        if (!file.isDirectory() && !file.mkdirs()) {
            throw new IOException("Unable to create " + file);
        }
    }

    public void setRollSize(long j) {
        this.rollSize = j;
    }

    public void setMaxLogsSize(long j) {
        this.maxLogsSize = j;
    }

    public void setMinLogRetentionPeriod(long j) {
        this.minLogRetentionPeriod = j;
    }

    public void setWorkerInterval(long j) {
        this.workerInterval = j;
    }

    public static void main(String[] strArr) throws IOException, ClassNotFoundException {
        Preconditions.checkPositionIndex(0, strArr.length, "input directory is a required arg");
        Preconditions.checkPositionIndex(1, strArr.length, "output directory is a required arg");
        Preconditions.checkPositionIndex(2, strArr.length, "classname is a required arg");
        String str = strArr[0];
        String str2 = strArr[1];
        Class<?> cls = Class.forName(strArr[2].trim());
        WAL wal = new WAL(new File(str), cls);
        if (strArr.length == 4) {
            wal.numReplaySequenceIDOverride = Integer.parseInt(strArr[3]);
            System.out.println("Overridng numReplaySequenceIDOverride: " + wal.numReplaySequenceIDOverride);
        }
        WALReplayResult<T> replay = wal.replay();
        wal.close();
        System.out.println("     SeqID: " + replay.getSequenceID());
        System.out.println("NumEntries: " + replay.getResults().size());
        WAL wal2 = new WAL(new File(str2), cls);
        wal2.writeEntries(replay.getResults());
        wal2.close();
    }
}
