package cz.o2.proxima.beam.direct.io;

import cz.o2.proxima.beam.core.io.StreamElementCoder;
import cz.o2.proxima.direct.batch.BatchLogObservable;
import cz.o2.proxima.direct.batch.BatchLogObserver;
import cz.o2.proxima.direct.core.Partition;
import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Lists;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.RepositoryFactory;
import cz.o2.proxima.storage.StreamElement;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/beam/direct/io/DirectBatchUnboundedSource.class */
public class DirectBatchUnboundedSource extends UnboundedSource<StreamElement, Checkpoint> {
    private static final Logger log = LoggerFactory.getLogger(DirectBatchUnboundedSource.class);
    private final RepositoryFactory factory;
    private final BatchLogObservable reader;
    private final List<AttributeDescriptor<?>> attributes;
    private final List<Partition> partitions;
    private final long startStamp;
    private final long endStamp;

    /* loaded from: input_file:cz/o2/proxima/beam/direct/io/DirectBatchUnboundedSource$Checkpoint.class */
    public static class Checkpoint implements UnboundedSource.CheckpointMark, Serializable {
        private final List<Partition> partitions;
        private final long skipFromFirst;

        Checkpoint(List<Partition> list, long j) {
            this.partitions = Lists.newArrayList(list);
            this.skipFromFirst = j;
        }

        public void finalizeCheckpoint() {
        }

        public String toString() {
            return "DirectBatchUnboundedSource.Checkpoint(partitions=" + getPartitions() + ", skipFromFirst=" + getSkipFromFirst() + ")";
        }

        public List<Partition> getPartitions() {
            return this.partitions;
        }

        public long getSkipFromFirst() {
            return this.skipFromFirst;
        }
    }

    /* loaded from: input_file:cz/o2/proxima/beam/direct/io/DirectBatchUnboundedSource$CheckpointCoder.class */
    public static class CheckpointCoder extends Coder<Checkpoint> {
        public void encode(Checkpoint checkpoint, OutputStream outputStream) throws IOException {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            GZIPOutputStream gZIPOutputStream = new GZIPOutputStream(byteArrayOutputStream);
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(gZIPOutputStream);
            objectOutputStream.writeObject(checkpoint);
            objectOutputStream.flush();
            gZIPOutputStream.finish();
            byte[] byteArray = byteArrayOutputStream.toByteArray();
            DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
            dataOutputStream.writeInt(byteArray.length);
            dataOutputStream.write(byteArray);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public Checkpoint m58decode(InputStream inputStream) throws CoderException, IOException {
            DataInputStream dataInputStream = new DataInputStream(inputStream);
            byte[] bArr = new byte[dataInputStream.readInt()];
            dataInputStream.readFully(bArr);
            try {
                return (Checkpoint) new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(bArr))).readObject();
            } catch (ClassNotFoundException e) {
                throw new CoderException(e);
            }
        }

        public List<? extends Coder<?>> getCoderArguments() {
            return Collections.emptyList();
        }

        public void verifyDeterministic() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/o2/proxima/beam/direct/io/DirectBatchUnboundedSource$StreamElementUnboundedReader.class */
    public static class StreamElementUnboundedReader extends UnboundedSource.UnboundedReader<StreamElement> {
        private final UnboundedSource<StreamElement, ?> source;
        private final BatchLogObservable reader;
        private final List<AttributeDescriptor<?>> attributes;
        private final List<Partition> toProcess;
        long skip;
        Instant watermark;
        private final BlockingQueue<StreamElement> queue = new ArrayBlockingQueue(100);
        private final AtomicBoolean running = new AtomicBoolean();
        private final AtomicBoolean stopped = new AtomicBoolean();

        @Nullable
        StreamElement current = null;

        @Nullable
        Partition runningPartition = null;
        long consumedFromCurrent = 0;

        public StreamElementUnboundedReader(UnboundedSource<StreamElement, ?> unboundedSource, BatchLogObservable batchLogObservable, List<AttributeDescriptor<?>> list, @Nullable Checkpoint checkpoint, List<Partition> list2) {
            this.source = (UnboundedSource) Objects.requireNonNull(unboundedSource);
            this.reader = (BatchLogObservable) Objects.requireNonNull(batchLogObservable);
            this.attributes = new ArrayList((Collection) Objects.requireNonNull(list));
            this.toProcess = new ArrayList((Collection) Objects.requireNonNull(list2));
            this.skip = checkpoint == null ? 0L : checkpoint.skipFromFirst;
        }

        public boolean start() {
            return advance();
        }

        /*  JADX ERROR: Failed to decode insn: 0x00C9: MOVE_MULTI, method: cz.o2.proxima.beam.direct.io.DirectBatchUnboundedSource.StreamElementUnboundedReader.advance():boolean
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        public boolean advance() {
            /*
                Method dump skipped, instructions count: 225
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: cz.o2.proxima.beam.direct.io.DirectBatchUnboundedSource.StreamElementUnboundedReader.advance():boolean");
        }

        public Instant getWatermark() {
            return this.watermark;
        }

        public UnboundedSource.CheckpointMark getCheckpointMark() {
            return new Checkpoint(this.toProcess, this.consumedFromCurrent);
        }

        /* renamed from: getCurrentSource, reason: merged with bridge method [inline-methods] */
        public UnboundedSource<StreamElement, ?> m59getCurrentSource() {
            return this.source;
        }

        /* renamed from: getCurrent, reason: merged with bridge method [inline-methods] */
        public StreamElement m60getCurrent() throws NoSuchElementException {
            return this.current;
        }

        public Instant getCurrentTimestamp() throws NoSuchElementException {
            return this.current == null ? BoundedWindow.TIMESTAMP_MIN_VALUE : new Instant(this.current.getStamp());
        }

        public void close() {
            this.stopped.set(true);
        }
    }

    public static DirectBatchUnboundedSource of(RepositoryFactory repositoryFactory, BatchLogObservable batchLogObservable, List<AttributeDescriptor<?>> list, long j, long j2) {
        return new DirectBatchUnboundedSource(repositoryFactory, batchLogObservable, list, j, j2);
    }

    private DirectBatchUnboundedSource(RepositoryFactory repositoryFactory, BatchLogObservable batchLogObservable, List<AttributeDescriptor<?>> list, long j, long j2) {
        this.factory = repositoryFactory;
        this.reader = batchLogObservable;
        this.attributes = Collections.unmodifiableList(list);
        this.partitions = Collections.emptyList();
        this.startStamp = j;
        this.endStamp = j2;
    }

    private DirectBatchUnboundedSource(DirectBatchUnboundedSource directBatchUnboundedSource, List<Partition> list, long j, long j2) {
        this.factory = directBatchUnboundedSource.factory;
        this.reader = directBatchUnboundedSource.reader;
        this.attributes = directBatchUnboundedSource.attributes;
        this.startStamp = j;
        this.endStamp = j2;
        ArrayList newArrayList = Lists.newArrayList(list);
        newArrayList.sort(partitionsComparator());
        this.partitions = Collections.unmodifiableList(newArrayList);
        if (log.isDebugEnabled()) {
            log.debug("Created source with partition min timestamps {}", newArrayList.stream().map((v0) -> {
                return v0.getMinTimestamp();
            }).collect(Collectors.toList()));
        }
    }

    public List<? extends UnboundedSource<StreamElement, Checkpoint>> split(int i, PipelineOptions pipelineOptions) throws Exception {
        if (!this.partitions.isEmpty()) {
            return Arrays.asList(this);
        }
        List<Partition> partitions = this.reader.getPartitions(this.startStamp, this.endStamp);
        ArrayList arrayList = new ArrayList();
        int i2 = 0;
        for (Partition partition : partitions) {
            if (arrayList.size() <= i2) {
                arrayList.add(new ArrayList());
            }
            ((List) arrayList.get(i2)).add(partition);
            i2 = (i2 + 1) % i;
        }
        return (List) arrayList.stream().map(list -> {
            return new DirectBatchUnboundedSource(this, list, this.startStamp, this.endStamp);
        }).collect(Collectors.toList());
    }

    public UnboundedSource.UnboundedReader<StreamElement> createReader(PipelineOptions pipelineOptions, Checkpoint checkpoint) throws IOException {
        return new StreamElementUnboundedReader(this, this.reader, this.attributes, checkpoint, Collections.synchronizedList(new ArrayList(checkpoint == null ? this.partitions : checkpoint.partitions)));
    }

    public Coder<Checkpoint> getCheckpointMarkCoder() {
        return new CheckpointCoder();
    }

    public Coder<StreamElement> getOutputCoder() {
        return StreamElementCoder.of(this.factory);
    }

    @VisibleForTesting
    static Comparator<Partition> partitionsComparator() {
        return (partition, partition2) -> {
            int compare = Long.compare(partition.getMinTimestamp(), partition2.getMinTimestamp());
            return compare == 0 ? Long.compare(partition.getMaxTimestamp(), partition2.getMaxTimestamp()) : compare;
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static BatchLogObserver asObserver(final BlockingQueue<StreamElement> blockingQueue, final AtomicBoolean atomicBoolean, final AtomicBoolean atomicBoolean2) {
        return new BatchLogObserver() { // from class: cz.o2.proxima.beam.direct.io.DirectBatchUnboundedSource.1
            public boolean onNext(StreamElement streamElement) {
                while (!atomicBoolean2.get()) {
                    try {
                        if (blockingQueue.offer(streamElement, 100L, TimeUnit.MILLISECONDS)) {
                            return true;
                        }
                    } catch (InterruptedException e) {
                        DirectBatchUnboundedSource.log.warn("Interrupted while reading data.", e);
                        Thread.currentThread().interrupt();
                        return false;
                    }
                }
                return false;
            }

            public void onCompleted() {
                atomicBoolean.set(false);
            }

            public boolean onError(Throwable th) {
                throw new RuntimeException(th);
            }
        };
    }

    static /* synthetic */ BatchLogObserver access$300(BlockingQueue blockingQueue, AtomicBoolean atomicBoolean, AtomicBoolean atomicBoolean2) {
        return asObserver(blockingQueue, atomicBoolean, atomicBoolean2);
    }
}
