package io.confluent.parallelconsumer;

import io.confluent.csid.utils.Range;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.EnumMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/confluent/parallelconsumer/OffsetSimultaneousEncoder.class */
public class OffsetSimultaneousEncoder {
    private static final Logger log = LoggerFactory.getLogger(OffsetSimultaneousEncoder.class);
    public static final int LARGE_INPUT_MAP_SIZE_THRESHOLD = 200;
    private final Set<Long> incompleteOffsets;
    private final long lowWaterMark;
    private final long nextExpectedOffset;
    Map<OffsetEncoding, byte[]> encodingMap = new EnumMap(OffsetEncoding.class);
    TreeSet<EncodedOffsetPair> sortedEncodings = new TreeSet<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/parallelconsumer/OffsetSimultaneousEncoder$BitsetEncoder.class */
    public class BitsetEncoder extends Encoder {
        private final ByteBuffer wrappedBitsetBytesBuffer;
        private final BitSet bitSet;
        private Optional<byte[]> encodedBytes;

        public BitsetEncoder(int i) {
            super();
            this.encodedBytes = Optional.empty();
            this.wrappedBitsetBytesBuffer = ByteBuffer.allocate(2 + (i / 8) + 1);
            this.wrappedBitsetBytesBuffer.putShort((short) i);
            this.bitSet = new BitSet(i);
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        protected OffsetEncoding getEncodingType() {
            return OffsetEncoding.BitSet;
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        protected OffsetEncoding getEncodingTypeCompressed() {
            return OffsetEncoding.BitSetCompressed;
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        public void containsIndex(int i) {
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        public void doesNotContainIndex(int i) {
            this.bitSet.set(i);
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        public byte[] serialise() {
            this.wrappedBitsetBytesBuffer.put(this.bitSet.toByteArray());
            byte[] array = this.wrappedBitsetBytesBuffer.array();
            this.encodedBytes = Optional.of(array);
            return array;
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        public int getEncodedSize() {
            return this.encodedBytes.get().length;
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        protected byte[] getEncodedBytes() {
            return this.encodedBytes.get();
        }
    }

    /* loaded from: input_file:io/confluent/parallelconsumer/OffsetSimultaneousEncoder$ByteBufferEncoder.class */
    private class ByteBufferEncoder extends Encoder {
        private final ByteBuffer bytesBuffer;

        public ByteBufferEncoder(int i) {
            super();
            this.bytesBuffer = ByteBuffer.allocate(1 + i);
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        protected OffsetEncoding getEncodingType() {
            return OffsetEncoding.ByteArray;
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        protected OffsetEncoding getEncodingTypeCompressed() {
            return OffsetEncoding.ByteArrayCompressed;
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        public void containsIndex(int i) {
            this.bytesBuffer.put((byte) 0);
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        public void doesNotContainIndex(int i) {
            this.bytesBuffer.put((byte) 1);
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        public byte[] serialise() {
            return this.bytesBuffer.array();
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        public int getEncodedSize() {
            return this.bytesBuffer.capacity();
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        protected byte[] getEncodedBytes() {
            return this.bytesBuffer.array();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/parallelconsumer/OffsetSimultaneousEncoder$Encoder.class */
    public abstract class Encoder {
        private Encoder() {
        }

        protected abstract OffsetEncoding getEncodingType();

        protected abstract OffsetEncoding getEncodingTypeCompressed();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract void containsIndex(int i);

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract void doesNotContainIndex(int i);

        abstract byte[] serialise();

        abstract int getEncodedSize();

        boolean quiteSmall() {
            return getEncodedSize() < 200;
        }

        byte[] compress() throws IOException {
            return OffsetSimpleSerialisation.compressZstd(getEncodedBytes());
        }

        final void register() {
            register(getEncodingType(), serialise());
        }

        private void register(OffsetEncoding offsetEncoding, byte[] bArr) {
            OffsetSimultaneousEncoder.this.sortedEncodings.add(new EncodedOffsetPair(offsetEncoding, ByteBuffer.wrap(bArr)));
            OffsetSimultaneousEncoder.this.encodingMap.put(offsetEncoding, bArr);
        }

        void registerCompressed() {
            register(getEncodingTypeCompressed(), compress());
        }

        protected abstract byte[] getEncodedBytes();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/parallelconsumer/OffsetSimultaneousEncoder$RunLengthEncoder.class */
    public class RunLengthEncoder extends Encoder {
        private final AtomicInteger currentRunLengthCount;
        private final AtomicBoolean previousRunLengthState;
        private final List<Integer> runLengthEncodingIntegers;
        private Optional<byte[]> encodedBytes;

        public RunLengthEncoder() {
            super();
            this.encodedBytes = Optional.empty();
            this.currentRunLengthCount = new AtomicInteger();
            this.previousRunLengthState = new AtomicBoolean(false);
            this.runLengthEncodingIntegers = new ArrayList();
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        protected OffsetEncoding getEncodingType() {
            return OffsetEncoding.RunLength;
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        protected OffsetEncoding getEncodingTypeCompressed() {
            return OffsetEncoding.RunLengthCompressed;
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        public void containsIndex(int i) {
            encodeRunLength(false);
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        public void doesNotContainIndex(int i) {
            encodeRunLength(true);
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        public byte[] serialise() {
            this.runLengthEncodingIntegers.add(Integer.valueOf(this.currentRunLengthCount.get()));
            ByteBuffer allocate = ByteBuffer.allocate(this.runLengthEncodingIntegers.size() * 2);
            Iterator<Integer> it = this.runLengthEncodingIntegers.iterator();
            while (it.hasNext()) {
                allocate.putShort(it.next().shortValue());
            }
            byte[] array = allocate.array();
            this.encodedBytes = Optional.of(array);
            return array;
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        public int getEncodedSize() {
            return this.encodedBytes.get().length;
        }

        @Override // io.confluent.parallelconsumer.OffsetSimultaneousEncoder.Encoder
        protected byte[] getEncodedBytes() {
            return this.encodedBytes.get();
        }

        private void encodeRunLength(boolean z) {
            if (this.previousRunLengthState.get() == z) {
                this.currentRunLengthCount.getAndIncrement();
                return;
            }
            this.previousRunLengthState.set(z);
            this.runLengthEncodingIntegers.add(Integer.valueOf(this.currentRunLengthCount.get()));
            this.currentRunLengthCount.set(1);
        }
    }

    public OffsetSimultaneousEncoder(long j, Long l, Set<Long> set) {
        this.lowWaterMark = j;
        this.nextExpectedOffset = l.longValue();
        this.incompleteOffsets = set;
    }

    public OffsetSimultaneousEncoder invoke() {
        log.trace("Starting encode of incompletes of {}, base offset is: {}", this.incompleteOffsets, Long.valueOf(this.lowWaterMark));
        int i = (int) (this.nextExpectedOffset - this.lowWaterMark);
        if (i > 200) {
            log.debug("~Large input map size: {}", Integer.valueOf(i));
        }
        HashSet hashSet = new HashSet();
        hashSet.add(new BitsetEncoder(i));
        hashSet.add(new RunLengthEncoder());
        log.debug("Encode loop offset start,end: [{},{}] length: {}", new Object[]{Long.valueOf(this.lowWaterMark), Long.valueOf(this.nextExpectedOffset), Integer.valueOf(i)});
        Range.range(i).forEach(num -> {
            long intValue = this.lowWaterMark + num.intValue();
            if (!this.incompleteOffsets.contains(Long.valueOf(intValue))) {
                hashSet.forEach(encoder -> {
                    encoder.doesNotContainIndex(num.intValue());
                });
            } else {
                log.trace("Found an incomplete offset {}", Long.valueOf(intValue));
                hashSet.forEach(encoder2 -> {
                    encoder2.containsIndex(num.intValue());
                });
            }
        });
        registerEncodings(hashSet);
        log.debug("In order: {}", this.sortedEncodings);
        return this;
    }

    private void registerEncodings(Set<? extends Encoder> set) {
        set.forEach((v0) -> {
            v0.register();
        });
        if (set.stream().noneMatch((v0) -> {
            return v0.quiteSmall();
        })) {
            set.forEach((v0) -> {
                v0.registerCompressed();
            });
        }
    }

    public byte[] packSmallest() {
        EncodedOffsetPair first = this.sortedEncodings.first();
        log.debug("Compression chosen is: {}", first.encoding.name());
        return packEncoding(first);
    }

    byte[] packEncoding(EncodedOffsetPair encodedOffsetPair) {
        ByteBuffer allocate = ByteBuffer.allocate(1 + encodedOffsetPair.data.capacity());
        allocate.put(encodedOffsetPair.encoding.magicByte);
        allocate.put(encodedOffsetPair.data);
        return allocate.array();
    }

    public Set<Long> getIncompleteOffsets() {
        return this.incompleteOffsets;
    }

    public Map<OffsetEncoding, byte[]> getEncodingMap() {
        return this.encodingMap;
    }

    public TreeSet<EncodedOffsetPair> getSortedEncodings() {
        return this.sortedEncodings;
    }
}
