package io.fluo.recipes.export;

import com.google.common.collect.Iterators;
import io.fluo.api.client.TransactionBase;
import io.fluo.api.data.Bytes;
import io.fluo.api.data.Column;
import io.fluo.api.observer.AbstractObserver;
import io.fluo.api.observer.Observer;
import io.fluo.recipes.export.ExportQueue;
import io.fluo.recipes.serialization.SimpleSerializer;
import java.util.Iterator;
import java.util.NoSuchElementException;

/* loaded from: input_file:io/fluo/recipes/export/ExportObserver.class */
public class ExportObserver<K, V> extends AbstractObserver {
    private String queueId;
    private Class<K> keyType;
    private Class<V> valType;
    SimpleSerializer serializer;
    private Exporter<K, V> exporter;
    private long memLimit;

    /* loaded from: input_file:io/fluo/recipes/export/ExportObserver$MemLimitIterator.class */
    private static class MemLimitIterator implements Iterator<ExportEntry> {
        private long memConsumed = 0;
        private long memLimit;
        private Iterator<ExportEntry> source;

        public MemLimitIterator(Iterator<ExportEntry> it, long j) {
            this.source = it;
            this.memLimit = j;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.memConsumed < this.memLimit && this.source.hasNext();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public ExportEntry next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            ExportEntry next = this.source.next();
            this.memConsumed += next.key.length + next.value.length;
            return next;
        }

        @Override // java.util.Iterator
        public void remove() {
            this.source.remove();
        }
    }

    protected String getQueueId() {
        return this.queueId;
    }

    SimpleSerializer getSerializer() {
        return this.serializer;
    }

    public void init(Observer.Context context) throws Exception {
        this.queueId = (String) context.getParameters().get("queueId");
        ExportQueue.Options options = new ExportQueue.Options(this.queueId, context.getAppConfiguration());
        this.keyType = (Class<K>) getClass().getClassLoader().loadClass(options.keyType);
        this.valType = (Class<V>) getClass().getClassLoader().loadClass(options.valueType);
        this.exporter = (Exporter) getClass().getClassLoader().loadClass(options.exporterType).asSubclass(Exporter.class).newInstance();
        this.serializer = SimpleSerializer.getInstance(context.getAppConfiguration());
        this.memLimit = options.getBufferSize();
        this.exporter.init(this.queueId, context);
    }

    public Observer.ObservedColumn getObservedColumn() {
        return new Observer.ObservedColumn(ExportBucket.newNotificationColumn(this.queueId), Observer.NotificationType.WEAK);
    }

    public void process(TransactionBase transactionBase, Bytes bytes, Column column) throws Exception {
        ExportBucket exportBucket = new ExportBucket(transactionBase, bytes);
        Iterator<ExportEntry> exportIterator = exportBucket.getExportIterator();
        this.exporter.processExports(Iterators.consumingIterator(Iterators.transform(new MemLimitIterator(exportIterator, this.memLimit), exportEntry -> {
            return new SequencedExport(this.serializer.deserialize(exportEntry.key, this.keyType), this.serializer.deserialize(exportEntry.value, this.valType), exportEntry.seq);
        })));
        if (exportIterator.hasNext()) {
            exportBucket.notifyExportObserver();
        }
    }
}
