package org.apache.flume.channel.file;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.flume.ChannelException;
import org.apache.flume.ChannelFullException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.annotations.Disposable;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.flume.channel.file.Log;
import org.apache.flume.channel.file.encryption.EncryptionConfiguration;
import org.apache.flume.channel.file.encryption.KeyProvider;
import org.apache.flume.channel.file.encryption.KeyProviderFactory;
import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
import org.apache.flume.conf.TransactionCapacitySupported;
import org.apache.flume.source.http.BLOBHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Stable
@Disposable
/* loaded from: input_file:META-INF/bundled-dependencies/flume-file-channel-1.9.0.jar:org/apache/flume/channel/file/FileChannel.class */
public class FileChannel extends BasicChannelSemantics implements TransactionCapacitySupported {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FileChannel.class);
    private int keepAlive;
    private long maxFileSize;
    private long minimumRequiredSpace;
    private File checkpointDir;
    private File backupCheckpointDir;
    private File[] dataDirs;
    private Log log;
    private volatile boolean open;
    private volatile Throwable startupError;
    private Semaphore queueRemaining;
    private FileChannelCounter channelCounter;
    private boolean useLogReplayV1;
    private KeyProvider encryptionKeyProvider;
    private String encryptionActiveKey;
    private String encryptionCipherProvider;
    private boolean useDualCheckpoints;
    private boolean compressBackupCheckpoint;
    private boolean fsyncPerTransaction;
    private int fsyncInterval;
    private Integer capacity = 0;
    protected Integer transactionCapacity = 0;
    private Long checkpointInterval = 0L;
    private final ThreadLocal<FileBackedTransaction> transactions = new ThreadLocal<>();
    private String channelNameDescriptor = "[channel=unknown]";
    private boolean useFastReplay = false;
    private boolean checkpointOnClose = true;

    /* loaded from: input_file:META-INF/bundled-dependencies/flume-file-channel-1.9.0.jar:org/apache/flume/channel/file/FileChannel$FileBackedTransaction.class */
    static class FileBackedTransaction extends BasicTransactionSemantics {
        private final LinkedBlockingDeque<FlumeEventPointer> takeList;
        private final LinkedBlockingDeque<FlumeEventPointer> putList;
        private final long transactionID;
        private final int keepAlive;
        private final Log log;
        private final FlumeEventQueue queue;
        private final Semaphore queueRemaining;
        private final String channelNameDescriptor;
        private final FileChannelCounter channelCounter;
        private final boolean fsyncPerTransaction;

        public FileBackedTransaction(Log log, long j, int i, int i2, Semaphore semaphore, String str, boolean z, FileChannelCounter fileChannelCounter) {
            this.log = log;
            this.queue = log.getFlumeEventQueue();
            this.transactionID = j;
            this.keepAlive = i2;
            this.queueRemaining = semaphore;
            this.putList = new LinkedBlockingDeque<>(i);
            this.takeList = new LinkedBlockingDeque<>(i);
            this.fsyncPerTransaction = z;
            this.channelNameDescriptor = "[channel=" + str + DefaultExpressionEngine.DEFAULT_ATTRIBUTE_END;
            this.channelCounter = fileChannelCounter;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isClosed() {
            return BasicTransactionSemantics.State.CLOSED.equals(getState());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String getStateAsString() {
            return String.valueOf(getState());
        }

        @Override // org.apache.flume.channel.BasicTransactionSemantics
        protected void doPut(Event event) throws InterruptedException {
            this.channelCounter.incrementEventPutAttemptCount();
            if (this.putList.remainingCapacity() == 0) {
                throw new ChannelException("Put queue for FileBackedTransaction of capacity " + this.putList.size() + " full, consider committing more frequently, increasing capacity or increasing thread count. " + this.channelNameDescriptor);
            }
            if (!this.queueRemaining.tryAcquire(this.keepAlive, TimeUnit.SECONDS)) {
                throw new ChannelFullException("The channel has reached it's capacity. This might be the result of a sink on the channel having too low of batch size, a downstream system running slower than normal, or that the channel capacity is just too low. " + this.channelNameDescriptor);
            }
            boolean z = false;
            this.log.lockShared();
            try {
                try {
                    FlumeEventPointer put = this.log.put(this.transactionID, event);
                    Preconditions.checkState(this.putList.offer(put), "putList offer failed " + this.channelNameDescriptor);
                    this.queue.addWithoutCommit(put, this.transactionID);
                    z = true;
                    this.log.unlockShared();
                    if (1 == 0) {
                        this.queueRemaining.release();
                    }
                } catch (IOException e) {
                    this.channelCounter.incrementEventPutErrorCount();
                    throw new ChannelException("Put failed due to IO error " + this.channelNameDescriptor, e);
                }
            } catch (Throwable th) {
                this.log.unlockShared();
                if (!z) {
                    this.queueRemaining.release();
                }
                throw th;
            }
        }

        @Override // org.apache.flume.channel.BasicTransactionSemantics
        protected Event doTake() throws InterruptedException {
            this.channelCounter.incrementEventTakeAttemptCount();
            if (this.takeList.remainingCapacity() == 0) {
                throw new ChannelException("Take list for FileBackedTransaction, capacity " + this.takeList.size() + " full, consider committing more frequently, increasing capacity, or increasing thread count. " + this.channelNameDescriptor);
            }
            this.log.lockShared();
            while (true) {
                try {
                    FlumeEventPointer removeHead = this.queue.removeHead(this.transactionID);
                    if (removeHead == null) {
                        return null;
                    }
                    try {
                        try {
                            try {
                                Preconditions.checkState(this.takeList.offer(removeHead), "takeList offer failed " + this.channelNameDescriptor);
                                this.log.take(this.transactionID, removeHead);
                                FlumeEvent flumeEvent = this.log.get(removeHead);
                                this.log.unlockShared();
                                return flumeEvent;
                            } catch (NoopRecordException e) {
                                FileChannel.LOG.warn("Corrupt record replaced by File Channel Integrity tool found. Will retrieve next event", (Throwable) e);
                                this.takeList.remove(removeHead);
                            }
                        } catch (IOException e2) {
                            this.channelCounter.incrementEventTakeErrorCount();
                            throw new ChannelException("Take failed due to IO error " + this.channelNameDescriptor, e2);
                        }
                    } catch (CorruptEventException e3) {
                        this.channelCounter.incrementEventTakeErrorCount();
                        if (this.fsyncPerTransaction) {
                            throw new ChannelException(e3);
                        }
                        FileChannel.LOG.warn("Corrupt record found. Event will be skipped, and next event will be read.", (Throwable) e3);
                        this.takeList.remove(removeHead);
                    }
                } finally {
                    this.log.unlockShared();
                }
            }
        }

        @Override // org.apache.flume.channel.BasicTransactionSemantics
        protected void doCommit() throws InterruptedException {
            int size = this.putList.size();
            int size2 = this.takeList.size();
            if (size > 0) {
                Preconditions.checkState(size2 == 0, "nonzero puts and takes " + this.channelNameDescriptor);
                this.log.lockShared();
                try {
                    try {
                        this.log.commitPut(this.transactionID);
                        this.channelCounter.addToEventPutSuccessCount(size);
                        synchronized (this.queue) {
                            while (!this.putList.isEmpty()) {
                                if (!this.queue.addTail(this.putList.removeFirst())) {
                                    StringBuilder sb = new StringBuilder();
                                    sb.append("Queue add failed, this shouldn't be able to ");
                                    sb.append("happen. A portion of the transaction has been ");
                                    sb.append("added to the queue but the remaining portion ");
                                    sb.append("cannot be added. Those messages will be consumed ");
                                    sb.append("despite this transaction failing. Please report.");
                                    sb.append(this.channelNameDescriptor);
                                    FileChannel.LOG.error(sb.toString());
                                    Preconditions.checkState(false, sb.toString());
                                }
                            }
                            this.queue.completeTransaction(this.transactionID);
                        }
                        this.log.unlockShared();
                    } catch (IOException e) {
                        throw new ChannelException("Commit failed due to IO error " + this.channelNameDescriptor, e);
                    }
                } finally {
                }
            } else if (size2 > 0) {
                this.log.lockShared();
                try {
                    try {
                        this.log.commitTake(this.transactionID);
                        this.queue.completeTransaction(this.transactionID);
                        this.channelCounter.addToEventTakeSuccessCount(size2);
                        this.log.unlockShared();
                        this.queueRemaining.release(size2);
                    } catch (IOException e2) {
                        throw new ChannelException("Commit failed due to IO error " + this.channelNameDescriptor, e2);
                    }
                } finally {
                }
            }
            this.putList.clear();
            this.takeList.clear();
            this.channelCounter.setChannelSize(this.queue.getSize());
        }

        @Override // org.apache.flume.channel.BasicTransactionSemantics
        protected void doRollback() throws InterruptedException {
            int size = this.putList.size();
            int size2 = this.takeList.size();
            this.log.lockShared();
            try {
                if (size2 > 0) {
                    try {
                        Preconditions.checkState(size == 0, "nonzero puts and takes " + this.channelNameDescriptor);
                        synchronized (this.queue) {
                            while (!this.takeList.isEmpty()) {
                                Preconditions.checkState(this.queue.addHead(this.takeList.removeLast()), "Queue add failed, this shouldn't be able to happen " + this.channelNameDescriptor);
                            }
                        }
                    } catch (IOException e) {
                        throw new ChannelException("Commit failed due to IO error " + this.channelNameDescriptor, e);
                    }
                }
                this.putList.clear();
                this.takeList.clear();
                this.queue.completeTransaction(this.transactionID);
                this.channelCounter.setChannelSize(this.queue.getSize());
                this.log.rollback(this.transactionID);
                this.log.unlockShared();
                this.queueRemaining.release(size);
            } catch (Throwable th) {
                this.log.unlockShared();
                this.queueRemaining.release(size);
                throw th;
            }
        }
    }

    @Override // org.apache.flume.channel.AbstractChannel, org.apache.flume.NamedComponent
    public synchronized void setName(String str) {
        this.channelNameDescriptor = "[channel=" + str + DefaultExpressionEngine.DEFAULT_ATTRIBUTE_END;
        super.setName(str);
    }

    @Override // org.apache.flume.channel.AbstractChannel, org.apache.flume.conf.Configurable
    public void configure(Context context) {
        this.useDualCheckpoints = context.getBoolean(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, false).booleanValue();
        this.compressBackupCheckpoint = context.getBoolean(FileChannelConfiguration.COMPRESS_BACKUP_CHECKPOINT, false).booleanValue();
        String replace = System.getProperty("user.home").replace('\\', '/');
        String trim = context.getString(FileChannelConfiguration.CHECKPOINT_DIR, replace + "/.flume/file-channel/checkpoint").trim();
        String trim2 = context.getString(FileChannelConfiguration.BACKUP_CHECKPOINT_DIR, "").trim();
        String[] strArr = (String[]) Iterables.toArray(Splitter.on(BLOBHandler.PARAMETER_SEPARATOR).trimResults().omitEmptyStrings().split(context.getString(FileChannelConfiguration.DATA_DIRS, replace + "/.flume/file-channel/data")), String.class);
        this.checkpointDir = new File(trim);
        if (this.useDualCheckpoints) {
            Preconditions.checkState(!trim2.isEmpty(), "Dual checkpointing is enabled, but the backup directory is not set. Please set backupCheckpointDir to enable dual checkpointing");
            this.backupCheckpointDir = new File(trim2);
            Preconditions.checkState(!this.backupCheckpointDir.equals(this.checkpointDir), "Could not configure " + getName() + ". The checkpoint backup directory and the checkpoint directory are configured to be the same.");
        }
        this.dataDirs = new File[strArr.length];
        for (int i = 0; i < strArr.length; i++) {
            this.dataDirs[i] = new File(strArr[i]);
        }
        this.capacity = context.getInteger("capacity", 1000000);
        if (this.capacity.intValue() <= 0) {
            this.capacity = 1000000;
            LOG.warn("Invalid capacity specified, initializing channel to default capacity of {}", this.capacity);
        }
        this.keepAlive = context.getInteger("keep-alive", 3).intValue();
        this.transactionCapacity = context.getInteger(FileChannelConfiguration.TRANSACTION_CAPACITY, 10000);
        if (this.transactionCapacity.intValue() <= 0) {
            this.transactionCapacity = 10000;
            LOG.warn("Invalid transaction capacity specified, initializing channel to default capacity of {}", this.transactionCapacity);
        }
        Preconditions.checkState(this.transactionCapacity.intValue() <= this.capacity.intValue(), "File Channel transaction capacity cannot be greater than the capacity of the channel.");
        this.checkpointInterval = context.getLong(FileChannelConfiguration.CHECKPOINT_INTERVAL, Long.valueOf(FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL));
        if (this.checkpointInterval.longValue() <= 0) {
            LOG.warn("Checkpoint interval is invalid: " + this.checkpointInterval + ", using default: " + FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL);
            this.checkpointInterval = Long.valueOf(FileChannelConfiguration.DEFAULT_CHECKPOINT_INTERVAL);
        }
        this.maxFileSize = Math.min(context.getLong(FileChannelConfiguration.MAX_FILE_SIZE, Long.valueOf(FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE)).longValue(), FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE);
        this.minimumRequiredSpace = Math.max(context.getLong(FileChannelConfiguration.MINIMUM_REQUIRED_SPACE, Long.valueOf(FileChannelConfiguration.DEFAULT_MINIMUM_REQUIRED_SPACE)).longValue(), 1048576L);
        this.useLogReplayV1 = context.getBoolean(FileChannelConfiguration.USE_LOG_REPLAY_V1, false).booleanValue();
        this.useFastReplay = context.getBoolean(FileChannelConfiguration.USE_FAST_REPLAY, false).booleanValue();
        Context context2 = new Context(context.getSubProperties("encryption."));
        String string = context2.getString(EncryptionConfiguration.KEY_PROVIDER);
        this.encryptionActiveKey = context2.getString(EncryptionConfiguration.ACTIVE_KEY);
        this.encryptionCipherProvider = context2.getString(EncryptionConfiguration.CIPHER_PROVIDER);
        if (string != null) {
            Preconditions.checkState(!Strings.isNullOrEmpty(this.encryptionActiveKey), "Encryption configuration problem: activeKey is missing");
            Preconditions.checkState(!Strings.isNullOrEmpty(this.encryptionCipherProvider), "Encryption configuration problem: cipherProvider is missing");
            this.encryptionKeyProvider = KeyProviderFactory.getInstance(string, new Context(context2.getSubProperties("keyProvider.")));
        } else {
            Preconditions.checkState(this.encryptionActiveKey == null, "Encryption configuration problem: activeKey is present while key provider name is not.");
            Preconditions.checkState(this.encryptionCipherProvider == null, "Encryption configuration problem: cipherProvider is present while key provider name is not.");
        }
        this.fsyncPerTransaction = context.getBoolean(FileChannelConfiguration.FSYNC_PER_TXN, true).booleanValue();
        this.fsyncInterval = context.getInteger(FileChannelConfiguration.FSYNC_INTERVAL, 5).intValue();
        this.checkpointOnClose = context.getBoolean(FileChannelConfiguration.CHKPT_ONCLOSE, FileChannelConfiguration.DEFAULT_CHKPT_ONCLOSE).booleanValue();
        if (this.queueRemaining == null) {
            this.queueRemaining = new Semaphore(this.capacity.intValue(), true);
        }
        if (this.log != null) {
            this.log.setCheckpointInterval(this.checkpointInterval.longValue());
            this.log.setMaxFileSize(this.maxFileSize);
        }
        if (this.channelCounter == null) {
            this.channelCounter = new FileChannelCounter(getName());
        }
        this.channelCounter.setUnhealthy(0);
    }

    /* JADX WARN: Removed duplicated region for block: B:6:0x00c9  */
    @Override // org.apache.flume.channel.AbstractChannel, org.apache.flume.lifecycle.LifecycleAware
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public synchronized void start() {
        /*
            Method dump skipped, instructions count: 233
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flume.channel.file.FileChannel.start():void");
    }

    @VisibleForTesting
    Log.Builder createLogBuilder() {
        Log.Builder builder = new Log.Builder();
        builder.setCheckpointInterval(this.checkpointInterval.longValue());
        builder.setMaxFileSize(this.maxFileSize);
        builder.setMinimumRequiredSpace(this.minimumRequiredSpace);
        builder.setQueueSize(this.capacity.intValue());
        builder.setCheckpointDir(this.checkpointDir);
        builder.setLogDirs(this.dataDirs);
        builder.setChannelName(getName());
        builder.setUseLogReplayV1(this.useLogReplayV1);
        builder.setUseFastReplay(this.useFastReplay);
        builder.setEncryptionKeyProvider(this.encryptionKeyProvider);
        builder.setEncryptionKeyAlias(this.encryptionActiveKey);
        builder.setEncryptionCipherProvider(this.encryptionCipherProvider);
        builder.setUseDualCheckpoints(this.useDualCheckpoints);
        builder.setCompressBackupCheckpoint(this.compressBackupCheckpoint);
        builder.setBackupCheckpointDir(this.backupCheckpointDir);
        builder.setFsyncPerTransaction(this.fsyncPerTransaction);
        builder.setFsyncInterval(this.fsyncInterval);
        builder.setCheckpointOnClose(this.checkpointOnClose);
        builder.setChannelCounter(this.channelCounter);
        return builder;
    }

    @Override // org.apache.flume.channel.AbstractChannel, org.apache.flume.lifecycle.LifecycleAware
    public synchronized void stop() {
        LOG.info("Stopping {}...", this);
        this.startupError = null;
        int depth = getDepth();
        close();
        if (!this.open) {
            this.channelCounter.setChannelSize(depth);
            this.channelCounter.stop();
        }
        super.stop();
    }

    @Override // org.apache.flume.channel.AbstractChannel
    public String toString() {
        return "FileChannel " + getName() + " { dataDirs: " + Arrays.toString(this.dataDirs) + " }";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flume.channel.BasicChannelSemantics
    public BasicTransactionSemantics createTransaction() {
        if (!this.open) {
            String str = "Channel closed " + this.channelNameDescriptor;
            if (this.startupError != null) {
                throw new IllegalStateException(str + ". Due to " + this.startupError.getClass().getName() + ": " + this.startupError.getMessage(), this.startupError);
            }
            throw new IllegalStateException(str);
        }
        FileBackedTransaction fileBackedTransaction = this.transactions.get();
        if (fileBackedTransaction != null && !fileBackedTransaction.isClosed()) {
            Preconditions.checkState(false, "Thread has transaction which is still open: " + fileBackedTransaction.getStateAsString() + this.channelNameDescriptor);
        }
        FileBackedTransaction fileBackedTransaction2 = new FileBackedTransaction(this.log, TransactionIDOracle.next(), this.transactionCapacity.intValue(), this.keepAlive, this.queueRemaining, getName(), this.fsyncPerTransaction, this.channelCounter);
        this.transactions.set(fileBackedTransaction2);
        return fileBackedTransaction2;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getDepth() {
        Preconditions.checkState(this.open, "Channel closed" + this.channelNameDescriptor);
        Preconditions.checkNotNull(this.log, "log");
        FlumeEventQueue flumeEventQueue = this.log.getFlumeEventQueue();
        Preconditions.checkNotNull(flumeEventQueue, "queue");
        return flumeEventQueue.getSize();
    }

    void close() {
        if (this.open) {
            setOpen(false);
            try {
                this.log.close();
            } catch (Exception e) {
                LOG.error("Error while trying to close the log.", (Throwable) e);
                Throwables.propagate(e);
            }
            this.log = null;
            this.queueRemaining = null;
        }
    }

    @VisibleForTesting
    boolean didFastReplay() {
        return this.log.didFastReplay();
    }

    @VisibleForTesting
    boolean didFullReplayDueToBadCheckpointException() {
        return this.log.didFullReplayDueToBadCheckpointException();
    }

    public boolean isOpen() {
        return this.open;
    }

    private void setOpen(boolean z) {
        this.open = z;
        this.channelCounter.setOpen(this.open);
    }

    @VisibleForTesting
    boolean checkpointBackupRestored() {
        if (this.log != null) {
            return this.log.backupRestored();
        }
        return false;
    }

    @VisibleForTesting
    Log getLog() {
        return this.log;
    }

    @VisibleForTesting
    FileChannelCounter getChannelCounter() {
        return this.channelCounter;
    }

    @Override // org.apache.flume.conf.TransactionCapacitySupported
    public long getTransactionCapacity() {
        return this.transactionCapacity.intValue();
    }
}
