package io.flinkspector.datastream.input;

import io.flinkspector.datastream.input.time.Instant;
import io.flinkspector.datastream.input.time.Moment;
import io.flinkspector.datastream.input.time.TimeSpan;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/* loaded from: input_file:io/flinkspector/datastream/input/EventTimeInputBuilder.class */
public class EventTimeInputBuilder<T> implements EventTimeInput<T> {
    private ArrayList<Pair<StreamRecord<T>, Long>> input = new ArrayList<>();
    private Boolean flushWindows = false;

    private EventTimeInputBuilder(StreamRecord<T> streamRecord) {
        add(streamRecord);
    }

    private EventTimeInputBuilder(StreamRecord<T> streamRecord, Long l) {
        addWithShift(streamRecord, l);
    }

    public static <T> EventTimeInputBuilder<T> startWith(T t) {
        if (t == null) {
            throw new IllegalArgumentException("Elem has to be not null!");
        }
        return new EventTimeInputBuilder<>(new StreamRecord(t, 0L));
    }

    public static <T> EventTimeInputBuilder<T> startWith(T t, long j) {
        if (t == null) {
            throw new IllegalArgumentException("Elem has to be not null!");
        }
        return new EventTimeInputBuilder<>(new StreamRecord(t, j));
    }

    public static <T> EventTimeInputBuilder<T> startWith(T t, Moment moment) {
        if (t == null) {
            throw new IllegalArgumentException("Elem has to be not null!");
        }
        return new EventTimeInputBuilder<>(new StreamRecord(t, moment.getTimestamp(0L)), Long.valueOf(moment.getShift()));
    }

    public static <T> EventTimeInputBuilder<T> startWith(StreamRecord<T> streamRecord) {
        if (streamRecord == null) {
            throw new IllegalArgumentException("Record has to be not null!");
        }
        return new EventTimeInputBuilder<>(streamRecord);
    }

    private void add(StreamRecord<T> streamRecord) {
        this.input.add(Pair.of(streamRecord, 0L));
    }

    private void addWithShift(StreamRecord<T> streamRecord, Long l) {
        this.input.add(Pair.of(streamRecord, l));
    }

    private void addWithShift(T t, Long l, Long l2) {
        this.input.add(Pair.of(new StreamRecord(t, l.longValue()), l2));
    }

    private Long getLastTimestamp() {
        Pair<StreamRecord<T>, Long> pair = this.input.get(this.input.size() - 1);
        return Long.valueOf(((StreamRecord) pair.getLeft()).getTimestamp() + ((Long) pair.getRight()).longValue());
    }

    public EventTimeInputBuilder<T> emit(T t, int i) {
        if (i < 0) {
            throw new IllegalArgumentException("negative times: " + i);
        }
        if (t == null) {
            throw new IllegalArgumentException("Elem has to be not null!");
        }
        emit(t, new Instant(), i);
        return this;
    }

    public EventTimeInputBuilder<T> emit(T t) {
        if (t == null) {
            throw new IllegalArgumentException("Elem has to be not null!");
        }
        add(new StreamRecord<>(t, getLastTimestamp().longValue()));
        return this;
    }

    public EventTimeInputBuilder<T> emit(T t, Moment moment) {
        if (moment == null) {
            throw new IllegalArgumentException("TimeBetween has to bo not null!");
        }
        addWithShift(new StreamRecord<>(t, moment.getTimestamp(getLastTimestamp().longValue())), Long.valueOf(moment.getShift()));
        return this;
    }

    public EventTimeInputBuilder<T> emit(StreamRecord<T> streamRecord) {
        if (streamRecord == null) {
            throw new IllegalArgumentException("Record has to be not null!");
        }
        add(streamRecord);
        return this;
    }

    public EventTimeInputBuilder<T> emit(T t, Moment moment, int i) {
        if (moment == null) {
            throw new IllegalArgumentException("TimeBetween has to bo not null!");
        }
        if (i < 1) {
            throw new IllegalArgumentException("Times has to be greater than 1.");
        }
        long longValue = getLastTimestamp().longValue();
        for (int i2 = 0; i2 < i; i2++) {
            longValue = moment.getTimestamp(longValue);
            addWithShift(t, Long.valueOf(longValue), Long.valueOf(moment.getShift()));
        }
        return this;
    }

    private long calculateShiftDifference(Pair<StreamRecord<T>, Long> pair) {
        return ((StreamRecord) pair.getLeft()).getTimestamp() + ((Long) pair.getRight()).longValue();
    }

    public EventTimeInputBuilder<T> repeatAll(int i) {
        repeatAll(new Instant(), i);
        return this;
    }

    public EventTimeInputBuilder<T> repeatAll(TimeSpan timeSpan, int i) {
        long longValue = getLastTimestamp().longValue();
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.addAll(repeatInput(timeSpan.getTimestamp(0L), longValue));
            longValue = calculateShiftDifference((Pair) arrayList.get(arrayList.size() - 1));
        }
        this.input.addAll(arrayList);
        return this;
    }

    public EventTimeInputBuilder<T> flushOpenWindowsOnTermination() {
        this.flushWindows = true;
        return this;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        Iterator<Pair<StreamRecord<T>, Long>> it = this.input.iterator();
        while (it.hasNext()) {
            Pair<StreamRecord<T>, Long> next = it.next();
            sb.append("value: " + next.getValue() + " timestamp: " + ((StreamRecord) next.getLeft()).getTimestamp() + "\n");
        }
        return sb.toString();
    }

    @Override // io.flinkspector.datastream.input.EventTimeInput
    public Boolean getFlushWindowsSetting() {
        return this.flushWindows;
    }

    @Override // io.flinkspector.datastream.input.EventTimeInput
    public List<StreamRecord<T>> getInput() {
        ArrayList arrayList = new ArrayList();
        Iterator<Pair<StreamRecord<T>, Long>> it = this.input.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getLeft());
        }
        return arrayList;
    }

    private List<Pair<StreamRecord<T>, Long>> repeatInput(long j, long j2) {
        ArrayList arrayList = new ArrayList();
        Iterator<Pair<StreamRecord<T>, Long>> it = this.input.iterator();
        Pair<StreamRecord<T>, Long> next = it.next();
        arrayList.add(Pair.of(copyRecord((StreamRecord) next.getLeft(), Long.valueOf(j2 + j)), 0L));
        long timestamp = ((StreamRecord) next.getLeft()).getTimestamp();
        long longValue = ((Long) next.getRight()).longValue();
        long j3 = j2 + j;
        while (it.hasNext()) {
            Pair<StreamRecord<T>, Long> next2 = it.next();
            long timestamp2 = ((StreamRecord) next2.getLeft()).getTimestamp() - timestamp;
            if (longValue > 0 && ((Long) next2.getRight()).longValue() > 0) {
                timestamp2 -= ((Long) next2.getRight()).longValue();
            }
            if (j3 + timestamp2 < 0) {
                throw new UnsupportedOperationException("Negative timestamp: " + j3 + timestamp2);
            }
            if (it.hasNext()) {
                arrayList.add(Pair.of(copyRecord((StreamRecord) next2.getLeft(), Long.valueOf(j3 + timestamp2)), 0L));
            } else {
                arrayList.add(Pair.of(copyRecord((StreamRecord) next2.getLeft(), Long.valueOf(j3 + timestamp2)), next2.getRight()));
            }
            j3 += timestamp2;
            timestamp = ((StreamRecord) next2.getLeft()).getTimestamp();
            longValue = ((Long) next2.getRight()).longValue();
        }
        return arrayList;
    }

    private StreamRecord<T> copyRecord(StreamRecord<T> streamRecord, Long l) {
        return new StreamRecord<>(streamRecord.getValue(), l.longValue());
    }
}
