package io.bigdime.core.channel;

import io.bigdime.alert.LoggerFactory;
import io.bigdime.core.commons.AdaptorLogger;
import io.bigdime.core.commons.PropertyHelper;
import io.bigdime.core.config.AdaptorConfigConstants;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.flume.ChannelException;
import org.apache.flume.Event;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

@Scope("prototype")
@Component
/* loaded from: input_file:lib/bigdime-core-0.9.1.jar:io/bigdime/core/channel/MemoryChannel.class */
public class MemoryChannel extends AbstractChannel {
    private String channelId;
    private static final AdaptorLogger logger = new AdaptorLogger(LoggerFactory.getLogger((Class<?>) MemoryChannel.class));
    private int removedCount;
    private boolean printStats;
    private long printStatsDurationInSeconds;
    private long putCount;
    private long takeCount;
    private long channelCapacity;
    private Set<String> consumerNames = new HashSet();
    private List<Event> eventList = Collections.synchronizedList(new ArrayList());
    private Map<String, Integer> consumerToTakenIndexMap = new HashMap();
    private Map<Integer, Integer> indexToTakenCountMap = new HashMap();
    private long channelSizeInBytes = 0;
    private long maxSizeInBytes = 0;
    private boolean channelStopped = false;

    @Override // io.bigdime.core.DataChannel
    public void build() {
        this.channelId = UUID.randomUUID().toString();
        this.printStats = PropertyHelper.getBooleanProperty(getProperties(), AdaptorConfigConstants.ChannelConfigConstants.PRINT_STATS);
        this.channelCapacity = PropertyHelper.getLongProperty(getProperties(), AdaptorConfigConstants.ChannelConfigConstants.CHANNEL_CAPACITY, 67108864L);
        this.printStatsDurationInSeconds = PropertyHelper.getLongProperty(getProperties(), AdaptorConfigConstants.ChannelConfigConstants.PRINT_STATS_DURATION_IN_SECONDS, 60L);
        logger.debug("building memory channel-notsync", "channel_name=\"{}\" channel_id=\"{}\" printStats=\"{}\" printStatsDurationInSeconds=\"{}\" channelCapacity=\"{}\"", getName(), this.channelId, Boolean.valueOf(this.printStats), Long.valueOf(this.printStatsDurationInSeconds), Long.valueOf(this.channelCapacity));
        this.printStatsDurationInSeconds *= 1000;
    }

    @Override // org.apache.flume.Channel
    public void put(Event event) {
        logger.debug("putting event on memory channel", "channel_name=\"{}\" channelSizeInBytes=\"{}\"", getName(), Long.valueOf(this.channelSizeInBytes));
        synchronized (this) {
            while (this.channelSizeInBytes + event.getBody().length > this.channelCapacity) {
                try {
                    logger.debug("waitingto put event on memory channel", "channel_name=\"{}\" channelCapacity=\"{}\"", getName(), Long.valueOf(this.channelCapacity));
                    wait(1000L);
                } catch (InterruptedException e) {
                }
            }
            this.channelSizeInBytes += event.getBody().length;
            if (this.channelSizeInBytes >= this.maxSizeInBytes) {
                this.maxSizeInBytes = this.channelSizeInBytes;
            }
            this.eventList.add(event);
            notifyAll();
        }
        this.putCount++;
    }

    @Override // io.bigdime.core.DataChannel, org.apache.flume.Channel
    public synchronized Event take() {
        if (this.consumerNames == null || this.consumerNames.isEmpty()) {
            return take("default");
        }
        throw new UnsupportedOperationException("this channel has registered consumers, invoking take method without parameters is not supported.");
    }

    @Override // io.bigdime.core.DataChannel
    public synchronized List<Event> take(int i) {
        if (this.consumerNames == null || this.consumerNames.isEmpty()) {
            return take("default", i);
        }
        throw new UnsupportedOperationException("this channel has registered consumers, invoking take method without parameters is not supported.");
    }

    @Override // io.bigdime.core.DataChannel
    public synchronized boolean registerConsumer(String str) {
        logger.info("registering consumers", "_message=\"before registering\" channel_name=\"{}\" consumers=\"{}\"", getName(), this.consumerNames);
        boolean add = this.consumerNames.add(str);
        logger.info("registering consumers", "_message=\"after registering\" channel_name=\"{}\" consumers=\"{}\"", getName(), this.consumerNames);
        return add;
    }

    @Override // io.bigdime.core.DataChannel
    public synchronized Event take(String str) {
        return take(str, 1).get(0);
    }

    @Override // io.bigdime.core.DataChannel
    public synchronized List<Event> take(String str, int i) {
        Integer num = this.consumerToTakenIndexMap.get(str);
        Integer valueOf = num == null ? 0 : Integer.valueOf(num.intValue() + 1);
        int intValue = valueOf.intValue() - this.removedCount;
        int size = this.eventList.size() - intValue;
        if (size == 0) {
            throw new ChannelException("No data found on channel");
        }
        int i2 = i;
        if (size < i) {
            i2 = size;
        }
        ArrayList arrayList = new ArrayList(this.eventList.subList(intValue, intValue + i2));
        this.consumerToTakenIndexMap.put(str, Integer.valueOf((valueOf.intValue() + i2) - 1));
        for (int i3 = 0; i3 < i2; i3++) {
            Integer num2 = this.indexToTakenCountMap.get(Integer.valueOf(valueOf.intValue() + i3));
            if (num2 == null) {
                num2 = 0;
            }
            Integer valueOf2 = Integer.valueOf(num2.intValue() + 1);
            if (valueOf2.intValue() == this.consumerNames.size()) {
                this.channelSizeInBytes -= this.eventList.remove(0).getBody().length;
                this.indexToTakenCountMap.remove(Integer.valueOf(valueOf.intValue() + i3));
                this.removedCount++;
            } else {
                this.indexToTakenCountMap.put(Integer.valueOf(valueOf.intValue() + i3), valueOf2);
            }
        }
        this.takeCount++;
        notifyAll();
        return arrayList;
    }

    public long getPutCount() {
        return this.putCount;
    }

    public long getTakeCount() {
        return this.takeCount;
    }

    public void printStats() {
        logger.info("print MemoryChannelStats", "channel_id=\"{}\" channel_name=\"{}\" channelCapacity=\"{}\" total_puts=\"{}\" total_takes=\"{}\" current_list_size=\"{}\" size_in_bytes=\"{}\" max_size_in_bytes=\"{}\" current_consumer_count=\"{}\" consumer_names=\"{}\" this=\"{}\"", getChannelId(), getName(), Long.valueOf(this.channelCapacity), Long.valueOf(this.putCount), Long.valueOf(this.takeCount), Integer.valueOf(this.eventList.size()), Long.valueOf(this.channelSizeInBytes), Long.valueOf(this.maxSizeInBytes), Integer.valueOf(this.consumerNames.size()), this.consumerNames, getClass());
    }

    @Override // org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        logger.info("starting MemoryChannelStats", "starting MemoryChannel, channel_name=\"{}\" channel_id=\"{}\"", getName(), getChannelId());
        if (this.printStats) {
            logger.info("starting MemoryChannelStats", "starting MemoryChannel");
            startStatsThread();
        }
        logger.info("starting MemoryChannelStats", "started MemoryChannel, channel_name=\"{}\" channel_id=\"{}\"", getName(), getChannelId());
    }

    @Override // org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        this.channelStopped = true;
    }

    public String getChannelId() {
        return this.channelId;
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [io.bigdime.core.channel.MemoryChannel$1] */
    private void startStatsThread() {
        new Thread() { // from class: io.bigdime.core.channel.MemoryChannel.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (!MemoryChannel.this.channelStopped) {
                    try {
                        MemoryChannel.logger.info("heathcheck thread for MemoryChannel", "printing stats, printStatsDuration=\"{}\"", Long.valueOf(MemoryChannel.this.printStatsDurationInSeconds));
                        MemoryChannel.this.printStats();
                        sleep(MemoryChannel.this.printStatsDurationInSeconds);
                    } catch (Exception e) {
                        MemoryChannel.logger.warn("heathcheck thread for MemoryChannel", "health check thread received an exception, will duck it. printStatsDurationInSeconds=\"{}\"", Long.valueOf(MemoryChannel.this.printStatsDurationInSeconds), e);
                    }
                }
            }
        }.start();
        logger.info("started heathcheck thread for MemoryChannel", "channel_id=\"{}\" thread_name=\"{}\" channel_name=\"{}\" ", getChannelId(), getName(), getName());
    }
}
