/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.beam.direct.io;

import cz.o2.proxima.beam.core.DataAccessor;
import cz.o2.proxima.beam.core.io.StreamElementCoder;
import cz.o2.proxima.beam.direct.io.DirectBatchSource;
import cz.o2.proxima.beam.direct.io.DirectBatchUnboundedSource;
import cz.o2.proxima.beam.direct.io.DirectBoundedSource;
import cz.o2.proxima.beam.direct.io.DirectUnboundedSource;
import cz.o2.proxima.direct.batch.BatchLogObservable;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.core.Context;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import java.net.URI;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.AssignEventTime;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.OutputHint;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

public class DirectDataAccessorWrapper
implements DataAccessor {
    private static final long serialVersionUID = 1L;
    private final RepositoryFactory factory;
    private final cz.o2.proxima.direct.core.DataAccessor direct;
    private final URI uri;
    private final Context context;

    public DirectDataAccessorWrapper(Repository repo, cz.o2.proxima.direct.core.DataAccessor direct, URI uri, Context context) {
        this.factory = repo.asFactory();
        this.direct = direct;
        this.uri = uri;
        this.context = context;
    }

    @Override
    public PCollection<StreamElement> createStream(String name, Pipeline pipeline, Position position, boolean stopAtCurrent, boolean eventTime, long limit) {
        CommitLogReader reader = (CommitLogReader)this.direct.getCommitLogReader(this.context).orElseThrow(() -> new IllegalArgumentException("Cannot create commit log from " + this.direct));
        PCollection ret = stopAtCurrent ? (PCollection)pipeline.apply("ReadBounded:" + this.uri, (PTransform)Read.from((BoundedSource)DirectBoundedSource.of(this.factory, name, reader, position, limit))) : (PCollection)pipeline.apply("ReadUnbounded:" + this.uri, (PTransform)Read.from((UnboundedSource)DirectUnboundedSource.of(this.factory, name, reader, position, eventTime, limit)));
        return ret.setCoder((Coder)StreamElementCoder.of(this.factory)).setTypeDescriptor(TypeDescriptor.of(StreamElement.class));
    }

    @Override
    public PCollection<StreamElement> createBatch(Pipeline pipeline, List<AttributeDescriptor<?>> attrs, long startStamp, long endStamp) {
        BatchLogObservable reader = (BatchLogObservable)this.direct.getBatchLogObservable(this.context).orElseThrow(() -> new IllegalArgumentException("Cannot create batch observable from " + this.direct));
        PCollection ret = (PCollection)pipeline.apply((PTransform)Read.from((BoundedSource)DirectBatchSource.of(this.factory, reader, attrs, startStamp, endStamp)));
        ret.setTypeDescriptor(TypeDescriptor.of(StreamElement.class)).setCoder((Coder)StreamElementCoder.of(this.factory));
        return AssignEventTime.of((PCollection)ret).using(StreamElement::getStamp).output(new OutputHint[0]).setCoder(ret.getCoder()).setTypeDescriptor(TypeDescriptor.of(StreamElement.class));
    }

    @Override
    public PCollection<StreamElement> createStreamFromUpdates(Pipeline pipeline, List<AttributeDescriptor<?>> attrs, long startStamp, long endStamp, long limit) {
        BatchLogObservable reader = (BatchLogObservable)this.direct.getBatchLogObservable(this.context).orElseThrow(() -> new IllegalArgumentException("Cannot create batch observable from " + this.direct));
        PCollection ret = (PCollection)pipeline.apply("ReadBatchUnbounded:" + this.uri, (PTransform)Read.from((UnboundedSource)DirectBatchUnboundedSource.of(this.factory, reader, attrs, startStamp, endStamp)));
        return ret.setCoder((Coder)StreamElementCoder.of(this.factory)).setTypeDescriptor(TypeDescriptor.of(StreamElement.class));
    }
}

