package io.pravega.connectors.flink;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/* loaded from: input_file:io/pravega/connectors/flink/EventTimeOrderingOperator.class */
public class EventTimeOrderingOperator<K, T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T>, Triggerable<K, VoidNamespace>, InputTypeConfigurable {
    private static final long serialVersionUID = 1;
    private static final String EVENT_QUEUE_STATE_NAME = "eventQueue";

    @VisibleForTesting
    long lastWatermark = Long.MIN_VALUE;
    private TypeSerializer<T> inputSerializer;
    private transient InternalTimerService<VoidNamespace> internalTimerService;
    private transient MapState<Long, List<T>> elementQueueState;
    static final /* synthetic */ boolean $assertionsDisabled;

    public EventTimeOrderingOperator() {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void setInputType(TypeInformation<?> typeInformation, ExecutionConfig executionConfig) {
        this.inputSerializer = typeInformation.createSerializer(executionConfig);
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        if (this.elementQueueState == null) {
            this.elementQueueState = getRuntimeContext().getMapState(new MapStateDescriptor(EVENT_QUEUE_STATE_NAME, LongSerializer.INSTANCE, new ListSerializer(this.inputSerializer)));
        }
    }

    public void open() throws Exception {
        super.open();
        this.internalTimerService = getInternalTimerService("ordering-timers", VoidNamespaceSerializer.INSTANCE, this);
    }

    public void processElement(StreamRecord<T> streamRecord) throws Exception {
        if (!streamRecord.hasTimestamp()) {
            this.output.collect(streamRecord);
        } else if (streamRecord.getTimestamp() > this.lastWatermark) {
            saveRegisterWatermarkTimer();
            bufferEvent(streamRecord);
        }
    }

    public void processWatermark(Watermark watermark) throws Exception {
        super.processWatermark(watermark);
        this.lastWatermark = watermark.getTimestamp();
    }

    private void bufferEvent(StreamRecord<T> streamRecord) throws Exception {
        if (!$assertionsDisabled && !streamRecord.hasTimestamp()) {
            throw new AssertionError();
        }
        long timestamp = streamRecord.getTimestamp();
        List list = (List) this.elementQueueState.get(Long.valueOf(timestamp));
        if (list == null) {
            list = new ArrayList(1);
        }
        if (getRuntimeContext().getExecutionConfig().isObjectReuseEnabled()) {
            list.add(this.inputSerializer.copy(streamRecord.getValue()));
        } else {
            list.add(streamRecord.getValue());
        }
        this.elementQueueState.put(Long.valueOf(timestamp), list);
    }

    private void saveRegisterWatermarkTimer() {
        long currentWatermark = this.internalTimerService.currentWatermark();
        if (currentWatermark + serialVersionUID > currentWatermark) {
            this.internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, currentWatermark + serialVersionUID);
        }
    }

    public void onEventTime(InternalTimer<K, VoidNamespace> internalTimer) throws Exception {
        long currentWatermark = this.internalTimerService.currentWatermark();
        PriorityQueue<Long> sortedTimestamps = getSortedTimestamps();
        while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek().longValue() <= currentWatermark) {
            long longValue = sortedTimestamps.poll().longValue();
            Iterator it = ((List) this.elementQueueState.get(Long.valueOf(longValue))).iterator();
            while (it.hasNext()) {
                this.output.collect(new StreamRecord(it.next(), longValue));
            }
            this.elementQueueState.remove(Long.valueOf(longValue));
        }
        if (sortedTimestamps.isEmpty()) {
            this.elementQueueState.clear();
        }
        if (sortedTimestamps.isEmpty()) {
            return;
        }
        saveRegisterWatermarkTimer();
    }

    public void onProcessingTime(InternalTimer<K, VoidNamespace> internalTimer) throws Exception {
    }

    private PriorityQueue<Long> getSortedTimestamps() throws Exception {
        PriorityQueue<Long> priorityQueue = new PriorityQueue<>();
        Iterator<T> it = this.elementQueueState.keys().iterator();
        while (it.hasNext()) {
            priorityQueue.offer((Long) it.next());
        }
        return priorityQueue;
    }

    static {
        $assertionsDisabled = !EventTimeOrderingOperator.class.desiredAssertionStatus();
    }
}
