/*
 * Decompiled with CFR 0.152.
 */
package cz.seznam.euphoria.core.client.io;

import cz.seznam.euphoria.core.client.io.BoundedDataSource;
import cz.seznam.euphoria.core.client.io.BoundedReader;
import cz.seznam.euphoria.core.client.io.CloseableIterator;
import cz.seznam.euphoria.core.client.io.UnboundedDataSource;
import cz.seznam.euphoria.core.client.io.UnboundedPartition;
import cz.seznam.euphoria.core.client.io.UnboundedReader;
import cz.seznam.euphoria.shadow.com.google.common.collect.Lists;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ListDataSource<T>
implements BoundedDataSource<T>,
UnboundedDataSource<T, Integer> {
    private static final Map<ListDataSource<?>, List<List<?>>> storage = Collections.synchronizedMap(new WeakHashMap());
    private final boolean bounded;
    private long sleepMs = 0L;
    private long finalSleepMs = 0L;
    private final int id = System.identityHashCode(this);
    private final int partition;
    private final ListDataSource<T> parent;

    @SafeVarargs
    public static <T> ListDataSource<T> bounded(List<T> ... partitions) {
        return ListDataSource.of(true, partitions);
    }

    @SafeVarargs
    public static <T> ListDataSource<T> unbounded(List<T> ... partitions) {
        return ListDataSource.of(false, partitions);
    }

    @SafeVarargs
    public static <T> ListDataSource<T> of(boolean bounded, List<T> ... partitions) {
        return ListDataSource.of(bounded, Lists.newArrayList(partitions));
    }

    public static <T> ListDataSource<T> of(boolean bounded, List<List<T>> partitions) {
        return new ListDataSource<T>(bounded, partitions);
    }

    private ListDataSource(boolean bounded, List<List<T>> partitions) {
        this.bounded = bounded;
        this.parent = null;
        this.partition = -1;
        storage.put(this, partitions);
    }

    private ListDataSource(ListDataSource<T> parent, int partition) {
        this.bounded = parent.bounded;
        this.parent = parent;
        this.partition = partition;
    }

    public boolean equals(Object o) {
        if (o instanceof ListDataSource) {
            ListDataSource that = (ListDataSource)o;
            return this.id == that.id;
        }
        return false;
    }

    public int hashCode() {
        return this.id;
    }

    @Override
    public List<UnboundedPartition<T, Integer>> getPartitions() {
        int n = storage.get(this).size();
        ArrayList<UnboundedPartition<T, Integer>> partitions = new ArrayList<UnboundedPartition<T, Integer>>(n);
        int i = 0;
        while (i < n) {
            int partition = i++;
            partitions.add(() -> new UnboundedListReader(storage.get(this).get(partition)));
        }
        return partitions;
    }

    @Override
    public List<BoundedDataSource<T>> split(long desiredSplitBytes) {
        int partition = 0;
        ArrayList<BoundedDataSource<T>> ret = new ArrayList<BoundedDataSource<T>>();
        for (List<?> l : storage.get(this)) {
            ret.add(new ListDataSource<T>(this, partition++));
        }
        return ret;
    }

    @Override
    public Set<String> getLocations() {
        return Collections.singleton("localhost");
    }

    @Override
    public BoundedReader<T> openReader() throws IOException {
        ListDataSource ref;
        List<Integer> partitions;
        if (this.partition == -1) {
            partitions = IntStream.range(0, storage.get(this).size()).mapToObj(Integer::valueOf).collect(Collectors.toList());
            ref = this;
        } else {
            partitions = Arrays.asList(this.partition);
            ref = this.parent;
        }
        return new BoundedListReader(partitions.stream().flatMap(i -> storage.get(ref).get((int)i).stream()).collect(Collectors.toList()));
    }

    @Override
    public boolean isBounded() {
        return this.bounded;
    }

    @Override
    public ListDataSource<T> asBounded() {
        if (this.isBounded()) {
            return this;
        }
        throw new UnsupportedOperationException("Source is unbounded.");
    }

    public ListDataSource<T> asUnbounded() {
        if (!this.isBounded()) {
            return this;
        }
        throw new UnsupportedOperationException("Source is bounded.");
    }

    public ListDataSource<T> withReadDelay(Duration timeout) {
        this.sleepMs = timeout.toMillis();
        return this;
    }

    public ListDataSource<T> withFinalDelay(Duration timeout) {
        this.finalSleepMs = timeout.toMillis();
        return this;
    }

    private class UnboundedListReader
    extends DataIterator
    implements UnboundedReader<T, Integer> {
        public UnboundedListReader(List<T> data) {
            super(data);
        }

        @Override
        public Integer getCurrentOffset() {
            return this.pos;
        }

        @Override
        public void reset(Integer offset) {
            this.pos = offset;
        }

        @Override
        public void commitOffset(Integer offset) {
        }
    }

    private class BoundedListReader
    extends DataIterator
    implements BoundedReader<T> {
        BoundedListReader(List<T> data) {
            super(data);
        }
    }

    private class DataIterator
    implements CloseableIterator<T> {
        final List<T> data;
        int pos = 0;
        boolean lastHasNext = true;
        T next = null;

        DataIterator(List<T> data) {
            this.data = data;
        }

        @Override
        public void close() throws IOException {
        }

        @Override
        public boolean hasNext() {
            boolean hasNext;
            boolean bl = hasNext = this.pos < this.data.size();
            if (hasNext != this.lastHasNext) {
                this.lastHasNext = hasNext;
                try {
                    Thread.sleep(ListDataSource.this.finalSleepMs);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            if (hasNext) {
                this.next = this.data.get(this.pos);
                try {
                    if (ListDataSource.this.sleepMs > 0L) {
                        Thread.sleep(ListDataSource.this.sleepMs);
                    }
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
            }
            return hasNext;
        }

        @Override
        public T next() {
            Object ret = this.next;
            if (ret == null) {
                throw this.pos >= this.data.size() ? new NoSuchElementException() : new IllegalStateException("Don't call `next` multiple times withou call to `hasNext`");
            }
            this.next = null;
            ++this.pos;
            return ret;
        }
    }
}

