/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.flink.core;

import cz.o2.proxima.core.functional.Consumer;
import cz.o2.proxima.core.repository.AttributeDescriptor;
import cz.o2.proxima.core.repository.RepositoryFactory;
import cz.o2.proxima.core.storage.Partition;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.batch.BatchLogObserver;
import cz.o2.proxima.direct.core.batch.BatchLogReader;
import cz.o2.proxima.direct.core.batch.ObserveHandle;
import cz.o2.proxima.direct.core.batch.Offset;
import cz.o2.proxima.flink.core.AbstractLogSourceFunction;
import cz.o2.proxima.flink.core.AbstractSourceLogObserver;
import cz.o2.proxima.flink.core.ResultExtractor;
import cz.o2.proxima.flink.core.batch.OffsetTrackingBatchLogReader;
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchLogSourceFunction<OutputT>
extends AbstractLogSourceFunction<BatchLogReader, LogObserver<OutputT>, Offset, BatchLogObserver.OnNextContext, OutputT> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(BatchLogSourceFunction.class);

    public BatchLogSourceFunction(RepositoryFactory repositoryFactory, List<AttributeDescriptor<?>> attributeDescriptors, ResultExtractor<OutputT> resultExtractor) {
        super(repositoryFactory, attributeDescriptors, resultExtractor);
    }

    @Override
    BatchLogReader createLogReader(List<AttributeDescriptor<?>> attributeDescriptors) {
        BatchLogReader batchLogReader = (BatchLogReader)((DirectDataOperator)this.getRepositoryFactory().apply().getOrCreateOperator(DirectDataOperator.class, new Consumer[0])).getBatchLogReader(attributeDescriptors).orElseThrow(() -> new IllegalStateException(String.format("Unable to find batch log reader for [%s].", attributeDescriptors)));
        return OffsetTrackingBatchLogReader.of(batchLogReader);
    }

    @Override
    List<Partition> getPartitions(BatchLogReader reader) {
        return reader.getPartitions();
    }

    @Override
    Partition getOffsetPartition(Offset offset) {
        return offset.getPartition();
    }

    @Override
    Set<Partition> getSkipFirstElementFromPartitions(List<Offset> offsets) {
        return offsets.stream().filter(offset -> offset.getElementIndex() >= 0L).map(Offset::getPartition).collect(Collectors.toSet());
    }

    @Override
    LogObserver<OutputT> createLogObserver(SourceFunction.SourceContext<OutputT> sourceContext, ResultExtractor<OutputT> resultExtractor, Set<Partition> skipFirstElement) {
        return new LogObserver<OutputT>(sourceContext, resultExtractor, skipFirstElement);
    }

    @Override
    AbstractLogSourceFunction.UnifiedObserveHandle<Offset> observeRestoredOffsets(BatchLogReader reader, List<Offset> offsets, List<AttributeDescriptor<?>> attributeDescriptors, LogObserver<OutputT> observer) {
        final OffsetTrackingBatchLogReader.OffsetTrackingObserveHandle delegate = (OffsetTrackingBatchLogReader.OffsetTrackingObserveHandle)reader.observeOffsets(offsets, attributeDescriptors, this.wrapSourceObserver(observer));
        return new AbstractLogSourceFunction.UnifiedObserveHandle<Offset>(){

            @Override
            public List<Offset> getConsumedOffsets() {
                return delegate.getCurrentOffsets().stream().filter(offset -> !offset.isLast()).collect(Collectors.toList());
            }

            @Override
            public void close() {
                delegate.close();
            }
        };
    }

    @Override
    AbstractLogSourceFunction.UnifiedObserveHandle<Offset> observePartitions(BatchLogReader reader, List<Partition> partitions, List<AttributeDescriptor<?>> attributeDescriptors, LogObserver<OutputT> observer) {
        ObserveHandle batchReaderHandle = reader.observe(partitions, attributeDescriptors, this.wrapSourceObserver(observer));
        final OffsetTrackingBatchLogReader.OffsetTrackingObserveHandle offsetTrackingHandle = (OffsetTrackingBatchLogReader.OffsetTrackingObserveHandle)batchReaderHandle;
        return new AbstractLogSourceFunction.UnifiedObserveHandle<Offset>(){

            @Override
            public List<Offset> getConsumedOffsets() {
                return offsetTrackingHandle.getCurrentOffsets().stream().filter(offset -> !offset.isLast()).collect(Collectors.toList());
            }

            @Override
            public void close() {
                offsetTrackingHandle.close();
            }
        };
    }

    @VisibleForTesting
    BatchLogObserver wrapSourceObserver(BatchLogObserver sourceObserver) {
        return sourceObserver;
    }

    static class LogObserver<OutputT>
    extends AbstractSourceLogObserver<Offset, BatchLogObserver.OnNextContext, OutputT>
    implements BatchLogObserver {
        LogObserver(SourceFunction.SourceContext<OutputT> sourceContext, ResultExtractor<OutputT> resultExtractor, Set<Partition> skipFirstElementFromEachPartition) {
            super(sourceContext, resultExtractor, skipFirstElementFromEachPartition);
        }

        @Override
        void markOffsetAsConsumed(BatchLogObserver.OnNextContext context) {
            OffsetTrackingBatchLogReader.OffsetCommitter committer = (OffsetTrackingBatchLogReader.OffsetCommitter)context;
            committer.markOffsetAsConsumed();
        }
    }
}

