/*
 * Decompiled with CFR 0.152.
 */
package io.flinkspector.datastream.input;

import io.flinkspector.datastream.input.EventTimeInput;
import io.flinkspector.datastream.input.time.Instant;
import io.flinkspector.datastream.input.time.Moment;
import io.flinkspector.datastream.input.time.TimeSpan;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

public class EventTimeInputBuilder<T>
implements EventTimeInput<T> {
    private ArrayList<Pair<StreamRecord<T>, Long>> input = new ArrayList();
    private Boolean flushWindows = false;

    private EventTimeInputBuilder(StreamRecord<T> record) {
        this.add(record);
    }

    private EventTimeInputBuilder(StreamRecord<T> record, Long shift) {
        this.addWithShift(record, shift);
    }

    public static <T> EventTimeInputBuilder<T> startWith(T record) {
        if (record == null) {
            throw new IllegalArgumentException("Elem has to be not null!");
        }
        return new EventTimeInputBuilder<T>(new StreamRecord(record, 0L));
    }

    public static <T> EventTimeInputBuilder<T> startWith(T record, long timeStamp) {
        if (record == null) {
            throw new IllegalArgumentException("Elem has to be not null!");
        }
        return new EventTimeInputBuilder<T>(new StreamRecord(record, timeStamp));
    }

    public static <T> EventTimeInputBuilder<T> startWith(T record, Moment moment) {
        if (record == null) {
            throw new IllegalArgumentException("Elem has to be not null!");
        }
        return new EventTimeInputBuilder<T>(new StreamRecord(record, moment.getTimestamp(0L)), moment.getShift());
    }

    public static <T> EventTimeInputBuilder<T> startWith(StreamRecord<T> streamRecord) {
        if (streamRecord == null) {
            throw new IllegalArgumentException("Record has to be not null!");
        }
        return new EventTimeInputBuilder<T>(streamRecord);
    }

    private void add(StreamRecord<T> record) {
        this.input.add(Pair.of(record, (Object)0L));
    }

    private void addWithShift(StreamRecord<T> record, Long shift) {
        this.input.add(Pair.of(record, (Object)shift));
    }

    private void addWithShift(T elem, Long timestamp, Long shift) {
        this.input.add(Pair.of((Object)new StreamRecord(elem, timestamp.longValue()), (Object)shift));
    }

    private Long getLastTimestamp() {
        Pair<StreamRecord<T>, Long> lastRecord = this.input.get(this.input.size() - 1);
        return ((StreamRecord)lastRecord.getLeft()).getTimestamp() + (Long)lastRecord.getRight();
    }

    public EventTimeInputBuilder<T> emit(T record, int times) {
        if (times < 0) {
            throw new IllegalArgumentException("negative times: " + times);
        }
        if (record == null) {
            throw new IllegalArgumentException("Elem has to be not null!");
        }
        this.emit(record, new Instant(), times);
        return this;
    }

    public EventTimeInputBuilder<T> emit(T record) {
        if (record == null) {
            throw new IllegalArgumentException("Elem has to be not null!");
        }
        this.add(new StreamRecord(record, this.getLastTimestamp().longValue()));
        return this;
    }

    public EventTimeInputBuilder<T> emit(T record, Moment timeSpan) {
        if (timeSpan == null) {
            throw new IllegalArgumentException("TimeBetween has to bo not null!");
        }
        long newTimeStamp = timeSpan.getTimestamp(this.getLastTimestamp());
        this.addWithShift(new StreamRecord(record, newTimeStamp), timeSpan.getShift());
        return this;
    }

    public EventTimeInputBuilder<T> emit(StreamRecord<T> streamRecord) {
        if (streamRecord == null) {
            throw new IllegalArgumentException("Record has to be not null!");
        }
        this.add(streamRecord);
        return this;
    }

    public EventTimeInputBuilder<T> emit(T elem, Moment moment, int times) {
        if (moment == null) {
            throw new IllegalArgumentException("TimeBetween has to bo not null!");
        }
        if (times < 1) {
            throw new IllegalArgumentException("Times has to be greater than 1.");
        }
        long ts = this.getLastTimestamp();
        for (int i = 0; i < times; ++i) {
            ts = moment.getTimestamp(ts);
            this.addWithShift(elem, ts, moment.getShift());
        }
        return this;
    }

    private long calculateShiftDifference(Pair<StreamRecord<T>, Long> entry) {
        return ((StreamRecord)entry.getLeft()).getTimestamp() + (Long)entry.getRight();
    }

    public EventTimeInputBuilder<T> repeatAll(int times) {
        this.repeatAll(new Instant(), times);
        return this;
    }

    public EventTimeInputBuilder<T> repeatAll(TimeSpan timeSpan, int times) {
        long start = this.getLastTimestamp();
        ArrayList<Pair<StreamRecord<T>, Long>> toAppend = new ArrayList<Pair<StreamRecord<T>, Long>>();
        for (int i = 0; i < times; ++i) {
            toAppend.addAll(this.repeatInput(timeSpan.getTimestamp(0L), start));
            start = this.calculateShiftDifference((Pair)toAppend.get(toAppend.size() - 1));
        }
        this.input.addAll(toAppend);
        return this;
    }

    public EventTimeInputBuilder<T> flushOpenWindowsOnTermination() {
        this.flushWindows = true;
        return this;
    }

    public String toString() {
        StringBuilder builder = new StringBuilder();
        for (Pair<StreamRecord<T>, Long> r : this.input) {
            builder.append("value: " + r.getValue() + " timestamp: " + ((StreamRecord)r.getLeft()).getTimestamp() + "\n");
        }
        return builder.toString();
    }

    @Override
    public Boolean getFlushWindowsSetting() {
        return this.flushWindows;
    }

    @Override
    public List<StreamRecord<T>> getInput() {
        ArrayList<StreamRecord<T>> records = new ArrayList<StreamRecord<T>>();
        for (Pair<StreamRecord<T>, Long> r : this.input) {
            records.add((StreamRecord<T>)r.getLeft());
        }
        return records;
    }

    private List<Pair<StreamRecord<T>, Long>> repeatInput(long time, long startTimeStamp) {
        ArrayList<Pair<StreamRecord<T>, Long>> append = new ArrayList<Pair<StreamRecord<T>, Long>>();
        Iterator<Pair<StreamRecord<T>, Long>> it = this.input.iterator();
        long last = startTimeStamp;
        long delta = time;
        Pair<StreamRecord<T>, Long> record = it.next();
        append.add(Pair.of(this.copyRecord((StreamRecord)record.getLeft(), last + delta), (Object)0L));
        long previous = ((StreamRecord)record.getLeft()).getTimestamp();
        long previousShift = (Long)record.getRight();
        last += delta;
        while (it.hasNext()) {
            record = it.next();
            delta = ((StreamRecord)record.getLeft()).getTimestamp() - previous;
            if (previousShift > 0L && (Long)record.getRight() > 0L) {
                delta -= ((Long)record.getRight()).longValue();
            }
            if (last + delta < 0L) {
                throw new UnsupportedOperationException("Negative timestamp: " + last + delta);
            }
            if (it.hasNext()) {
                append.add(Pair.of(this.copyRecord((StreamRecord)record.getLeft(), last + delta), (Object)0L));
            } else {
                append.add(Pair.of(this.copyRecord((StreamRecord)record.getLeft(), last + delta), (Object)record.getRight()));
            }
            last += delta;
            previous = ((StreamRecord)record.getLeft()).getTimestamp();
            previousShift = (Long)record.getRight();
        }
        return append;
    }

    private StreamRecord<T> copyRecord(StreamRecord<T> record, Long timestamp) {
        return new StreamRecord(record.getValue(), timestamp.longValue());
    }
}

