package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.class */
public class StreamTwoInputProcessor<IN1, IN2> {
    private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
    private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
    private int currentChannel = -1;
    private boolean isFinished;
    private final CheckpointBarrierHandler barrierHandler;
    private final long[] watermarks1;
    private long lastEmittedWatermark1;
    private final long[] watermarks2;
    private long lastEmittedWatermark2;
    private final int numInputChannels1;
    private final DeserializationDelegate<StreamElement> deserializationDelegate1;
    private final DeserializationDelegate<StreamElement> deserializationDelegate2;

    public StreamTwoInputProcessor(Collection<InputGate> collection, Collection<InputGate> collection2, TypeSerializer<IN1> typeSerializer, TypeSerializer<IN2> typeSerializer2, EventListener<CheckpointBarrier> eventListener, CheckpointingMode checkpointingMode, IOManager iOManager, boolean z) throws IOException {
        InputGate createInputGate = InputGateUtil.createInputGate(collection, collection2);
        if (checkpointingMode == CheckpointingMode.EXACTLY_ONCE) {
            this.barrierHandler = new BarrierBuffer(createInputGate, iOManager);
        } else {
            if (checkpointingMode != CheckpointingMode.AT_LEAST_ONCE) {
                throw new IllegalArgumentException("Unrecognized CheckpointingMode: " + checkpointingMode);
            }
            this.barrierHandler = new BarrierTracker(createInputGate);
        }
        if (eventListener != null) {
            this.barrierHandler.registerCheckpointEventHandler(eventListener);
        }
        if (z) {
            this.deserializationDelegate1 = new NonReusingDeserializationDelegate(new MultiplexingStreamRecordSerializer(typeSerializer));
        } else {
            this.deserializationDelegate1 = new NonReusingDeserializationDelegate(new StreamRecordSerializer(typeSerializer));
        }
        if (z) {
            this.deserializationDelegate2 = new NonReusingDeserializationDelegate(new MultiplexingStreamRecordSerializer(typeSerializer2));
        } else {
            this.deserializationDelegate2 = new NonReusingDeserializationDelegate(new StreamRecordSerializer(typeSerializer2));
        }
        this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[createInputGate.getNumberOfInputChannels()];
        for (int i = 0; i < this.recordDeserializers.length; i++) {
            this.recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer(iOManager.getSpillingDirectoriesPaths());
        }
        int i2 = 0;
        Iterator<InputGate> it = collection.iterator();
        while (it.hasNext()) {
            i2 += it.next().getNumberOfInputChannels();
        }
        this.numInputChannels1 = i2;
        int numberOfInputChannels = createInputGate.getNumberOfInputChannels() - i2;
        this.watermarks1 = new long[i2];
        Arrays.fill(this.watermarks1, Long.MIN_VALUE);
        this.lastEmittedWatermark1 = Long.MIN_VALUE;
        this.watermarks2 = new long[numberOfInputChannels];
        Arrays.fill(this.watermarks2, Long.MIN_VALUE);
        this.lastEmittedWatermark2 = Long.MIN_VALUE;
    }

    public boolean processInput(TwoInputStreamOperator<IN1, IN2, ?> twoInputStreamOperator, Object obj) throws Exception {
        if (this.isFinished) {
            return false;
        }
        while (true) {
            if (this.currentRecordDeserializer != null) {
                RecordDeserializer.DeserializationResult nextRecord = this.currentChannel < this.numInputChannels1 ? this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate1) : this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate2);
                if (nextRecord.isBufferConsumed()) {
                    this.currentRecordDeserializer.getCurrentBuffer().recycle();
                    this.currentRecordDeserializer = null;
                }
                if (nextRecord.isFullRecord()) {
                    if (this.currentChannel < this.numInputChannels1) {
                        StreamElement deserializationDelegate = this.deserializationDelegate1.getInstance();
                        if (!deserializationDelegate.isWatermark()) {
                            synchronized (obj) {
                                twoInputStreamOperator.setKeyContextElement1(deserializationDelegate.asRecord());
                                twoInputStreamOperator.processElement1(deserializationDelegate.asRecord());
                            }
                            return true;
                        }
                        handleWatermark(twoInputStreamOperator, (Watermark) deserializationDelegate, this.currentChannel, obj);
                    } else {
                        StreamElement deserializationDelegate2 = this.deserializationDelegate2.getInstance();
                        if (!deserializationDelegate2.isWatermark()) {
                            synchronized (obj) {
                                twoInputStreamOperator.setKeyContextElement2(deserializationDelegate2.asRecord());
                                twoInputStreamOperator.processElement2(deserializationDelegate2.asRecord());
                            }
                            return true;
                        }
                        handleWatermark(twoInputStreamOperator, deserializationDelegate2.asWatermark(), this.currentChannel, obj);
                    }
                }
            }
            BufferOrEvent nextNonBlocked = this.barrierHandler.getNextNonBlocked();
            if (nextNonBlocked == null) {
                this.isFinished = true;
                if (this.barrierHandler.isEmpty()) {
                    return false;
                }
                throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
            }
            if (nextNonBlocked.isBuffer()) {
                this.currentChannel = nextNonBlocked.getChannelIndex();
                this.currentRecordDeserializer = this.recordDeserializers[this.currentChannel];
                this.currentRecordDeserializer.setNextBuffer(nextNonBlocked.getBuffer());
            } else {
                AbstractEvent event = nextNonBlocked.getEvent();
                if (event.getClass() != EndOfPartitionEvent.class) {
                    throw new IOException("Unexpected event: " + event);
                }
            }
        }
    }

    private void handleWatermark(TwoInputStreamOperator<IN1, IN2, ?> twoInputStreamOperator, Watermark watermark, int i, Object obj) throws Exception {
        if (i < this.numInputChannels1) {
            long timestamp = watermark.getTimestamp();
            if (timestamp > this.watermarks1[i]) {
                this.watermarks1[i] = timestamp;
                long j = Long.MAX_VALUE;
                for (long j2 : this.watermarks1) {
                    j = Math.min(j2, j);
                }
                if (j > this.lastEmittedWatermark1) {
                    this.lastEmittedWatermark1 = j;
                    synchronized (obj) {
                        twoInputStreamOperator.processWatermark1(new Watermark(this.lastEmittedWatermark1));
                    }
                    return;
                }
                return;
            }
            return;
        }
        int i2 = i - this.numInputChannels1;
        long timestamp2 = watermark.getTimestamp();
        if (timestamp2 > this.watermarks2[i2]) {
            this.watermarks2[i2] = timestamp2;
            long j3 = Long.MAX_VALUE;
            for (long j4 : this.watermarks2) {
                j3 = Math.min(j4, j3);
            }
            if (j3 > this.lastEmittedWatermark2) {
                this.lastEmittedWatermark2 = j3;
                synchronized (obj) {
                    twoInputStreamOperator.processWatermark2(new Watermark(this.lastEmittedWatermark2));
                }
            }
        }
    }

    public void setReporter(AccumulatorRegistry.Reporter reporter) {
        for (RecordDeserializer<DeserializationDelegate<StreamElement>> recordDeserializer : this.recordDeserializers) {
            recordDeserializer.setReporter(reporter);
        }
    }

    public void setMetricGroup(IOMetricGroup iOMetricGroup) {
        iOMetricGroup.gauge("currentLowWatermark", (String) new Gauge<Long>() { // from class: org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.flink.metrics.Gauge
            /* renamed from: getValue */
            public Long mo2597getValue() {
                return Long.valueOf(Math.min(StreamTwoInputProcessor.this.lastEmittedWatermark1, StreamTwoInputProcessor.this.lastEmittedWatermark2));
            }
        });
    }

    public void cleanup() throws IOException {
        for (RecordDeserializer<DeserializationDelegate<StreamElement>> recordDeserializer : this.recordDeserializers) {
            Buffer currentBuffer = recordDeserializer.getCurrentBuffer();
            if (currentBuffer != null && !currentBuffer.isRecycled()) {
                currentBuffer.recycle();
            }
        }
        this.barrierHandler.cleanup();
    }
}
