/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.flink.core;

import cz.o2.proxima.core.functional.Consumer;
import cz.o2.proxima.core.repository.AttributeDescriptor;
import cz.o2.proxima.core.repository.RepositoryFactory;
import cz.o2.proxima.core.storage.Partition;
import cz.o2.proxima.core.storage.commitlog.Position;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.core.commitlog.CommitLogReader;
import cz.o2.proxima.direct.core.commitlog.ObserveHandle;
import cz.o2.proxima.direct.core.commitlog.Offset;
import cz.o2.proxima.flink.core.AbstractLogSourceFunction;
import cz.o2.proxima.flink.core.AbstractSourceLogObserver;
import cz.o2.proxima.flink.core.ResultExtractor;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitLogSourceFunction<OutputT>
extends AbstractLogSourceFunction<CommitLogReader, LogObserver<OutputT>, Offset, CommitLogObserver.OnNextContext, OutputT> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(CommitLogSourceFunction.class);

    public CommitLogSourceFunction(RepositoryFactory repositoryFactory, List<AttributeDescriptor<?>> attributeDescriptors, ResultExtractor<OutputT> resultExtractor) {
        super(repositoryFactory, attributeDescriptors, resultExtractor);
    }

    @Override
    CommitLogReader createLogReader(List<AttributeDescriptor<?>> attributeDescriptors) {
        return (CommitLogReader)((DirectDataOperator)this.getRepositoryFactory().apply().getOrCreateOperator(DirectDataOperator.class, new Consumer[0])).getCommitLogReader(attributeDescriptors).orElseThrow(() -> new IllegalStateException(String.format("Unable to find commit log reader for [%s].", attributeDescriptors)));
    }

    @Override
    List<Partition> getPartitions(CommitLogReader reader) {
        return reader.getPartitions();
    }

    @Override
    Partition getOffsetPartition(Offset offset) {
        return offset.getPartition();
    }

    @Override
    Set<Partition> getSkipFirstElementFromPartitions(List<Offset> offsets) {
        return offsets.stream().map(Offset::getPartition).collect(Collectors.toSet());
    }

    @Override
    LogObserver<OutputT> createLogObserver(SourceFunction.SourceContext<OutputT> sourceContext, ResultExtractor<OutputT> resultExtractor, Set<Partition> skipFirstElement) {
        return new LogObserver<OutputT>(sourceContext, resultExtractor, skipFirstElement);
    }

    @Override
    AbstractLogSourceFunction.UnifiedObserveHandle<Offset> observePartitions(CommitLogReader reader, List<Partition> partitions, List<AttributeDescriptor<?>> attributeDescriptors, LogObserver<OutputT> observer) {
        final ObserveHandle commitLogHandle = reader.observeBulkPartitions(partitions, Position.OLDEST, false, observer);
        return new AbstractLogSourceFunction.UnifiedObserveHandle<Offset>(){

            @Override
            public List<Offset> getConsumedOffsets() {
                return commitLogHandle.getCurrentOffsets();
            }

            @Override
            public void close() {
                commitLogHandle.close();
            }
        };
    }

    @Override
    AbstractLogSourceFunction.UnifiedObserveHandle<Offset> observeRestoredOffsets(CommitLogReader reader, List<Offset> offsets, List<AttributeDescriptor<?>> attributeDescriptors, LogObserver<OutputT> observer) {
        final ObserveHandle delegate = reader.observeBulkOffsets(offsets, false, observer);
        return new AbstractLogSourceFunction.UnifiedObserveHandle<Offset>(){

            @Override
            public List<Offset> getConsumedOffsets() {
                return delegate.getCurrentOffsets();
            }

            @Override
            public void close() {
                delegate.close();
            }
        };
    }

    static class LogObserver<OutputT>
    extends AbstractSourceLogObserver<Offset, CommitLogObserver.OnNextContext, OutputT>
    implements CommitLogObserver {
        LogObserver(SourceFunction.SourceContext<OutputT> sourceContext, ResultExtractor<OutputT> resultExtractor, Set<Partition> skipFirstElementFromEachPartition) {
            super(sourceContext, resultExtractor, skipFirstElementFromEachPartition);
        }

        public void onIdle(CommitLogObserver.OnIdleContext context) {
            this.maybeUpdateWatermark(context.getWatermark());
        }

        @Override
        void markOffsetAsConsumed(CommitLogObserver.OnNextContext context) {
        }
    }
}

