package io.datakernel.stream.processor;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import io.datakernel.async.CompletionCallback;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.stream.AbstractStreamConsumer;
import io.datakernel.stream.StreamDataReceiver;
import io.datakernel.stream.StreamForwarder;
import io.datakernel.stream.StreamProducer;
import io.datakernel.stream.StreamProducers;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;

/* loaded from: input_file:io/datakernel/stream/processor/StreamSorter.class */
public class StreamSorter<K, T> extends AbstractStreamConsumer<T> implements StreamDataReceiver<T>, StreamSorterMBean {
    private final StreamMergeSorterStorage<T> storage;
    private final Function<T, K> keyFunction;
    private final Comparator<K> keyComparator;
    private final boolean deduplicate;
    protected final int itemsInMemorySize;
    private final Comparator<T> itemComparator;
    protected List<T> list;
    private List<Integer> listOfPartitions;
    private StreamProducer<T> saveProducer;
    private StreamForwarder<T> result;
    protected long jmxItems;
    static final /* synthetic */ boolean $assertionsDisabled;

    public StreamSorter(Eventloop eventloop, StreamMergeSorterStorage<T> streamMergeSorterStorage, final Function<T, K> function, final Comparator<K> comparator, boolean z, int i) {
        super(eventloop);
        this.storage = (StreamMergeSorterStorage) Preconditions.checkNotNull(streamMergeSorterStorage);
        this.keyComparator = (Comparator) Preconditions.checkNotNull(comparator);
        this.keyFunction = (Function) Preconditions.checkNotNull(function);
        this.deduplicate = z;
        Preconditions.checkArgument(i > 0, "itemsInMemorySize must be positive value, got %s", new Object[]{Integer.valueOf(i)});
        this.itemsInMemorySize = i;
        this.itemComparator = new Comparator<T>() { // from class: io.datakernel.stream.processor.StreamSorter.1
            private final Function<T, K> _keyFunction;
            private final Comparator<K> _keyComparator;

            {
                this._keyFunction = function;
                this._keyComparator = comparator;
            }

            /* JADX WARN: Multi-variable type inference failed */
            @Override // java.util.Comparator
            public int compare(T t, T t2) {
                return this._keyComparator.compare(this._keyFunction.apply(t), this._keyFunction.apply(t2));
            }
        };
        this.list = new ArrayList(i + (i >> 4));
        this.listOfPartitions = new ArrayList();
        this.result = new StreamForwarder<>(eventloop);
        this.result.addCompletionCallback(new CompletionCallback() { // from class: io.datakernel.stream.processor.StreamSorter.2
            public void onComplete() {
                StreamSorter.this.closeUpstream();
            }

            public void onException(Exception exc) {
                StreamSorter.this.closeUpstreamWithError(exc);
            }
        });
    }

    public StreamProducer<T> getSortedStream() {
        return this.result;
    }

    @Override // io.datakernel.stream.StreamConsumer
    public StreamDataReceiver<T> getDataReceiver() {
        return this;
    }

    @Override // io.datakernel.stream.StreamDataReceiver
    public void onData(T t) {
        if (!$assertionsDisabled) {
            long j = this.jmxItems;
            long j2 = this.jmxItems + 1;
            this.jmxItems = j2;
            if (j == j2) {
                throw new AssertionError();
            }
        }
        this.list.add(t);
        if (this.list.size() >= this.itemsInMemorySize) {
            nextState();
        }
    }

    protected void nextState() {
        if (this.result.getUpstream() != null) {
            return;
        }
        boolean z = this.list.size() >= this.itemsInMemorySize;
        if (this.saveProducer != null) {
            if (z) {
                suspendUpstream();
                return;
            }
            return;
        }
        if (getUpstreamStatus() == 2) {
            StreamMerger streamMerger = StreamMerger.streamMerger(this.eventloop, this.keyFunction, this.keyComparator, this.deduplicate);
            Collections.sort(this.list, this.itemComparator);
            StreamProducer ofIterable = StreamProducers.ofIterable(this.eventloop, this.list);
            this.list = null;
            ofIterable.streamTo(streamMerger.newInput());
            Iterator<Integer> it = this.listOfPartitions.iterator();
            while (it.hasNext()) {
                this.storage.streamReader(it.next().intValue()).streamTo(streamMerger.newInput());
            }
            streamMerger.streamTo(this.result);
            return;
        }
        if (!z) {
            resumeUpstream();
            return;
        }
        Collections.sort(this.list, this.itemComparator);
        this.saveProducer = StreamProducers.ofIterable(this.eventloop, this.list);
        this.listOfPartitions.add(Integer.valueOf(this.storage.nextPartition()));
        this.saveProducer.streamTo(this.storage.streamWriter());
        this.saveProducer.addCompletionCallback(new CompletionCallback() { // from class: io.datakernel.stream.processor.StreamSorter.3
            public void onComplete() {
                StreamSorter.this.eventloop.post(new Runnable() { // from class: io.datakernel.stream.processor.StreamSorter.3.1
                    @Override // java.lang.Runnable
                    public void run() {
                        StreamSorter.this.saveProducer = null;
                        StreamSorter.this.nextState();
                    }
                });
            }

            public void onException(Exception exc) {
                new StreamProducers.ClosingWithError(StreamSorter.this.eventloop, exc).streamTo(StreamSorter.this.result);
            }
        });
        this.list = new ArrayList(this.list.size() + (this.list.size() >> 4));
    }

    @Override // io.datakernel.stream.StreamConsumer
    public void onEndOfStream() {
        nextState();
    }

    @Override // io.datakernel.stream.AbstractStreamConsumer, io.datakernel.stream.StreamConsumer
    public void onError(Exception exc) {
        this.result.onError(exc);
        this.upstreamProducer.closeWithError(exc);
    }

    @Override // io.datakernel.stream.processor.StreamSorterMBean
    public long getItems() {
        return this.jmxItems;
    }

    @Override // io.datakernel.stream.AbstractStreamConsumer
    public String toString() {
        String str = "?";
        if (!$assertionsDisabled) {
            String str2 = "" + this.jmxItems;
            str = str2;
            if (str2 == null) {
                throw new AssertionError();
            }
        }
        return '{' + super.toString() + " items:" + str + '}';
    }

    static {
        $assertionsDisabled = !StreamSorter.class.desiredAssertionStatus();
    }
}
