/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.flink.core;

import cz.o2.proxima.core.repository.AttributeDescriptor;
import cz.o2.proxima.core.repository.RepositoryFactory;
import cz.o2.proxima.core.storage.Partition;
import cz.o2.proxima.core.util.ExceptionUtils;
import cz.o2.proxima.direct.core.LogObserver;
import cz.o2.proxima.flink.core.AbstractSourceLogObserver;
import cz.o2.proxima.flink.core.ResultExtractor;
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.Generated;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class AbstractLogSourceFunction<ReaderT, ObserverT extends AbstractSourceLogObserver<OffsetT, ContextT, OutputT>, OffsetT extends Serializable, ContextT extends LogObserver.OnNextContext<OffsetT>, OutputT>
extends RichParallelSourceFunction<OutputT>
implements CheckpointListener,
CheckpointedFunction {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractLogSourceFunction.class);
    private static final String OFFSETS_STATE_NAME = "offsets";
    private final RepositoryFactory repositoryFactory;
    private final List<AttributeDescriptor<?>> attributeDescriptors;
    private final ResultExtractor<OutputT> resultExtractor;
    @Nullable
    private transient List<OffsetT> restoredOffsets;
    @Nullable
    private transient ListState<OffsetT> persistedOffsets;
    @Nullable
    private volatile transient UnifiedObserveHandle<OffsetT> observeHandle;
    private volatile transient CountDownLatch running;
    private volatile transient CountDownLatch cancelled;

    private static int getSubtaskIndex(Partition partition, int numParallelSubtasks) {
        return partition.getId() % numParallelSubtasks;
    }

    AbstractLogSourceFunction(RepositoryFactory repositoryFactory, List<AttributeDescriptor<?>> attributeDescriptors, ResultExtractor<OutputT> resultExtractor) {
        this.repositoryFactory = repositoryFactory;
        this.attributeDescriptors = attributeDescriptors;
        this.resultExtractor = resultExtractor;
    }

    public void open(Configuration parameters) {
        this.running = new CountDownLatch(1);
        this.cancelled = new CountDownLatch(1);
    }

    abstract ReaderT createLogReader(List<AttributeDescriptor<?>> var1);

    abstract List<Partition> getPartitions(ReaderT var1);

    abstract Partition getOffsetPartition(OffsetT var1);

    abstract Set<Partition> getSkipFirstElementFromPartitions(List<OffsetT> var1);

    abstract ObserverT createLogObserver(SourceFunction.SourceContext<OutputT> var1, ResultExtractor<OutputT> var2, Set<Partition> var3);

    abstract UnifiedObserveHandle<OffsetT> observePartitions(ReaderT var1, List<Partition> var2, List<AttributeDescriptor<?>> var3, ObserverT var4);

    abstract UnifiedObserveHandle<OffsetT> observeRestoredOffsets(ReaderT var1, List<OffsetT> var2, List<AttributeDescriptor<?>> var3, ObserverT var4);

    public void run(SourceFunction.SourceContext<OutputT> sourceContext) throws Exception {
        ReaderT reader = this.createLogReader(this.attributeDescriptors);
        List<Partition> partitions = this.getPartitions(reader).stream().filter(partition -> AbstractLogSourceFunction.getSubtaskIndex(partition, this.getRuntimeContext().getNumberOfParallelSubtasks()) == this.getRuntimeContext().getIndexOfThisSubtask()).collect(Collectors.toList());
        if (!partitions.isEmpty()) {
            ObserverT observer;
            if (this.restoredOffsets != null) {
                List filteredOffsets = this.restoredOffsets.stream().filter(offset -> AbstractLogSourceFunction.getSubtaskIndex(this.getOffsetPartition(offset), this.getRuntimeContext().getNumberOfParallelSubtasks()) == this.getRuntimeContext().getIndexOfThisSubtask()).collect(Collectors.toList());
                Set<Partition> skipFirstElement = this.getSkipFirstElementFromPartitions(filteredOffsets);
                observer = this.createLogObserver(sourceContext, this.resultExtractor, skipFirstElement);
                this.observeHandle = this.observeRestoredOffsets(reader, filteredOffsets, this.attributeDescriptors, observer);
            } else {
                observer = this.createLogObserver(sourceContext, this.resultExtractor, Collections.emptySet());
                this.observeHandle = this.observePartitions(reader, partitions, this.attributeDescriptors, observer);
            }
            log.info("Source [{}]: RUNNING", (Object)this);
            this.running.countDown();
            ((AbstractSourceLogObserver)observer).awaitCompleted();
            Optional<Throwable> maybeError = ((AbstractSourceLogObserver)observer).getError();
            if (maybeError.isPresent()) {
                log.error("Source [{}]: FAILED", (Object)this, (Object)maybeError.get());
                ExceptionUtils.rethrowAsIllegalStateException((Throwable)maybeError.get());
            } else {
                this.finishAndMarkAsIdle(sourceContext);
            }
        } else {
            this.running.countDown();
            this.finishAndMarkAsIdle(sourceContext);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void finishAndMarkAsIdle(SourceFunction.SourceContext<?> sourceContext) {
        log.info("Source [{}]: COMPLETED", (Object)this);
        Object object = sourceContext.getCheckpointLock();
        synchronized (object) {
            sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
        }
        sourceContext.markAsTemporarilyIdle();
        while (this.cancelled.getCount() > 0L) {
            try {
                this.cancelled.await();
            }
            catch (InterruptedException e) {
                if (this.cancelled.getCount() != 0L) continue;
                Thread.currentThread().interrupt();
            }
        }
    }

    public void cancel() {
        this.cancelled.countDown();
    }

    public void close() {
        if (this.observeHandle != null) {
            Objects.requireNonNull(this.observeHandle).close();
        }
    }

    public void notifyCheckpointComplete(long l) {
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        Objects.requireNonNull(this.persistedOffsets).clear();
        if (this.observeHandle != null) {
            for (Serializable offset : this.observeHandle.getConsumedOffsets()) {
                this.persistedOffsets.add((Object)offset);
            }
        }
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        OperatorStateStore stateStore = context.getOperatorStateStore();
        this.persistedOffsets = stateStore.getUnionListState(new ListStateDescriptor(OFFSETS_STATE_NAME, (TypeSerializer)new JavaSerializer()));
        if (context.isRestored()) {
            this.restoredOffsets = new ArrayList<OffsetT>();
            ((Iterable)Objects.requireNonNull(this.persistedOffsets).get()).forEach(this.restoredOffsets::add);
            log.info("BatchLog subtask {} restored state: {}.", (Object)this.getRuntimeContext().getIndexOfThisSubtask(), this.restoredOffsets);
        } else {
            log.info("BatchLog subtask {} has no state to restore.", (Object)this.getRuntimeContext().getIndexOfThisSubtask());
        }
    }

    @VisibleForTesting
    void awaitRunning() throws InterruptedException {
        this.running.await();
    }

    @Generated
    public RepositoryFactory getRepositoryFactory() {
        return this.repositoryFactory;
    }

    public static interface UnifiedObserveHandle<OffsetT>
    extends Closeable {
        public List<OffsetT> getConsumedOffsets();

        @Override
        public void close();
    }
}

