package org.apache.flume.channel.recoverable.memory;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.channel.recoverable.memory.wal.WAL;
import org.apache.flume.channel.recoverable.memory.wal.WALEntry;
import org.apache.flume.channel.recoverable.memory.wal.WALReplayResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
@Deprecated
/* loaded from: input_file:org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel.class */
public class RecoverableMemoryChannel extends BasicChannelSemantics {
    private static final Logger LOG = LoggerFactory.getLogger(RecoverableMemoryChannel.class);
    public static final String WAL_DATA_DIR = "wal.dataDir";
    public static final String WAL_ROLL_SIZE = "wal.rollSize";
    public static final String WAL_MAX_LOGS_SIZE = "wal.maxLogsSize";
    public static final String WAL_MIN_RETENTION_PERIOD = "wal.minRetentionPeriod";
    public static final String WAL_WORKER_INTERVAL = "wal.workerInterval";
    public static final String CAPACITY = "capacity";
    public static final String KEEPALIVE = "keep-alive";
    public static final int DEFAULT_CAPACITY = 100;
    public static final int DEFAULT_KEEPALIVE = 3;
    private WAL<RecoverableMemoryChannelEvent> wal;
    private Semaphore queueRemaining;
    private int capacity;
    private int keepAlive;
    private MemoryChannel memoryChannel = new MemoryChannel();
    private AtomicLong seqidGenerator = new AtomicLong(0);
    private volatile boolean open = false;

    /* loaded from: input_file:org/apache/flume/channel/recoverable/memory/RecoverableMemoryChannel$RecoverableMemoryTransaction.class */
    private static class RecoverableMemoryTransaction extends BasicTransactionSemantics {
        private Transaction transaction;
        private MemoryChannel memoryChannel;
        private RecoverableMemoryChannel channel;
        private List<Long> sequenceIds;
        private List<RecoverableMemoryChannelEvent> events;
        private int takes;

        private RecoverableMemoryTransaction(RecoverableMemoryChannel recoverableMemoryChannel, MemoryChannel memoryChannel) {
            this.sequenceIds = Lists.newArrayList();
            this.events = Lists.newArrayList();
            this.channel = recoverableMemoryChannel;
            this.memoryChannel = memoryChannel;
            this.transaction = this.memoryChannel.getTransaction();
            this.takes = 0;
        }

        protected void doBegin() throws InterruptedException {
            this.transaction.begin();
        }

        protected void doPut(Event event) throws InterruptedException {
            if (!this.channel.open) {
                throw new ChannelException("Channel not open");
            }
            if (!this.channel.queueRemaining.tryAcquire(this.channel.keepAlive, TimeUnit.SECONDS)) {
                throw new ChannelException("Cannot acquire capacity");
            }
            RecoverableMemoryChannelEvent recoverableMemoryChannelEvent = new RecoverableMemoryChannelEvent(event, this.channel.nextSequenceID());
            this.memoryChannel.put(recoverableMemoryChannelEvent);
            this.events.add(recoverableMemoryChannelEvent);
        }

        protected Event doTake() throws InterruptedException {
            if (!this.channel.open) {
                throw new ChannelException("Channel not open");
            }
            RecoverableMemoryChannelEvent recoverableMemoryChannelEvent = (RecoverableMemoryChannelEvent) this.memoryChannel.take();
            if (recoverableMemoryChannelEvent == null) {
                return null;
            }
            this.sequenceIds.add(Long.valueOf(recoverableMemoryChannelEvent.sequenceId));
            this.takes++;
            return recoverableMemoryChannelEvent.event;
        }

        protected void doCommit() throws InterruptedException {
            if (!this.channel.open) {
                throw new ChannelException("Channel not open");
            }
            if (this.sequenceIds.size() > 0) {
                try {
                    this.channel.commitSequenceID(this.sequenceIds);
                } catch (IOException e) {
                    throw new ChannelException("Unable to commit", e);
                }
            }
            if (!this.events.isEmpty()) {
                try {
                    this.channel.commitEvents(this.events);
                } catch (IOException e2) {
                    throw new ChannelException("Unable to commit", e2);
                }
            }
            this.transaction.commit();
            this.channel.queueRemaining.release(this.takes);
        }

        protected void doRollback() throws InterruptedException {
            this.sequenceIds.clear();
            this.events.clear();
            this.channel.queueRemaining.release(this.events.size());
            this.transaction.rollback();
        }

        protected void doClose() {
            this.sequenceIds.clear();
            this.events.clear();
            this.transaction.close();
        }
    }

    public void configure(Context context) {
        this.memoryChannel.configure(context);
        int intValue = context.getInteger(CAPACITY, 100).intValue();
        if (this.queueRemaining == null) {
            this.queueRemaining = new Semaphore(intValue, true);
        } else if (intValue > this.capacity) {
            this.queueRemaining.release(intValue - this.capacity);
        } else if (intValue < this.capacity) {
            this.queueRemaining.acquireUninterruptibly(this.capacity - intValue);
        }
        this.capacity = intValue;
        this.keepAlive = context.getInteger(KEEPALIVE, 3).intValue();
        long longValue = context.getLong(WAL_ROLL_SIZE, Long.valueOf(WAL.DEFAULT_ROLL_SIZE)).longValue();
        long longValue2 = context.getLong(WAL_MAX_LOGS_SIZE, Long.valueOf(WAL.DEFAULT_MAX_LOGS_SIZE)).longValue();
        long longValue3 = context.getLong(WAL_MIN_RETENTION_PERIOD, Long.valueOf(WAL.DEFAULT_MIN_LOG_RETENTION_PERIOD)).longValue();
        long longValue4 = context.getLong(WAL_WORKER_INTERVAL, Long.valueOf(WAL.DEFAULT_WORKER_INTERVAL)).longValue();
        if (this.wal == null) {
            try {
                this.wal = new WAL<>(new File(context.getString(WAL_DATA_DIR, System.getProperty("user.home").replace('\\', '/') + "/.flume/recoverable-memory-channel")), RecoverableMemoryChannelEvent.class, longValue, longValue2, longValue3, longValue4);
                return;
            } catch (IOException e) {
                Throwables.propagate(e);
                return;
            }
        }
        this.wal.setRollSize(longValue);
        this.wal.setMaxLogsSize(longValue2);
        this.wal.setMinLogRetentionPeriod(longValue3);
        this.wal.setWorkerInterval(longValue4);
        LOG.warn(getClass().getSimpleName() + " only supports partial reconfiguration.");
    }

    public synchronized void start() {
        LOG.info("Starting " + this);
        try {
            WALReplayResult<RecoverableMemoryChannelEvent> replay = this.wal.replay();
            Preconditions.checkArgument(replay.getSequenceID() >= 0);
            LOG.info("Replay SequenceID " + replay.getSequenceID());
            this.seqidGenerator.set(replay.getSequenceID());
            int size = replay.getResults().size();
            Preconditions.checkState(size <= this.capacity, "Capacity " + this.capacity + ", but we need to replay " + size);
            LOG.info("Replay Events " + size);
            Iterator<WALEntry<RecoverableMemoryChannelEvent>> it = replay.getResults().iterator();
            while (it.hasNext()) {
                this.seqidGenerator.set(Math.max(it.next().getSequenceID(), this.seqidGenerator.get()));
            }
            for (WALEntry<RecoverableMemoryChannelEvent> wALEntry : replay.getResults()) {
                Transaction transaction = null;
                try {
                    try {
                        transaction = this.memoryChannel.getTransaction();
                        transaction.begin();
                        this.memoryChannel.put(wALEntry.getData());
                        transaction.commit();
                        if (transaction != null) {
                            transaction.close();
                        }
                    } finally {
                    }
                } catch (Error e) {
                    if (transaction != null) {
                        try {
                            transaction.rollback();
                        } catch (Exception e2) {
                            LOG.info("Error during rollback", e2);
                        }
                    }
                    throw e;
                } catch (Exception e3) {
                    if (transaction != null) {
                        try {
                            transaction.rollback();
                        } catch (Exception e4) {
                            LOG.info("Error during rollback", e4);
                        }
                    }
                    Throwables.propagate(e3);
                    if (transaction != null) {
                        transaction.close();
                    }
                }
            }
        } catch (IOException e5) {
            Throwables.propagate(e5);
        }
        super.start();
        this.open = true;
    }

    public synchronized void stop() {
        this.open = false;
        LOG.info("Stopping " + this);
        try {
            close();
        } catch (IOException e) {
            Throwables.propagate(e);
        }
        super.stop();
    }

    protected BasicTransactionSemantics createTransaction() {
        return new RecoverableMemoryTransaction(this.memoryChannel);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commitEvents(List<RecoverableMemoryChannelEvent> list) throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        for (RecoverableMemoryChannelEvent recoverableMemoryChannelEvent : list) {
            newArrayList.add(new WALEntry(recoverableMemoryChannelEvent, recoverableMemoryChannelEvent.sequenceId));
        }
        this.wal.writeEntries(newArrayList);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commitSequenceID(List<Long> list) throws IOException {
        this.wal.writeSequenceIDs(list);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long nextSequenceID() {
        return this.seqidGenerator.incrementAndGet();
    }

    void close() throws IOException {
        if (this.wal != null) {
            this.wal.close();
        }
    }
}
