package org.apache.beam.sdk.extensions.smb;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.extensions.smb.BucketMetadata;
import org.apache.beam.sdk.extensions.smb.SMBFilenamePolicy;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.fs.ResourceIdCoder;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGbkResultSchema;
import org.apache.beam.sdk.transforms.join.UnionCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.UnsignedBytes;

/* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSource.class */
public class SortedBucketSource<FinalKeyT> extends PTransform<PBegin, PCollection<KV<FinalKeyT, CoGbkResult>>> {
    private static final Comparator<byte[]> bytesComparator = UnsignedBytes.lexicographicalComparator();
    private final Class<FinalKeyT> finalKeyClass;
    private final transient List<BucketedInput<?, ?>> sources;

    /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSource$BucketedInput.class */
    public static class BucketedInput<K, V> {
        private final TupleTag<V> tupleTag;
        private final String filenameSuffix;
        private final FileOperations<V> fileOperations;
        private final List<ResourceId> inputDirectories;
        private transient Map<ResourceId, DirectoryMetadata> inputMetadata;
        private transient BucketMetadata<K, V> canonicalMetadata;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSource$BucketedInput$BucketedInputCoder.class */
        public static class BucketedInputCoder<K, V> extends AtomicCoder<BucketedInput<K, V>> {
            private static SerializableCoder<TupleTag> tupleTagCoder = SerializableCoder.of(TupleTag.class);
            private static StringUtf8Coder fileSuffixCoder = StringUtf8Coder.of();
            private static SerializableCoder<FileOperations> fileOperationsCoder = SerializableCoder.of(FileOperations.class);
            private static MapCoder<ResourceId, DirectoryMetadata> inputMetadataCoder = MapCoder.of(ResourceIdCoder.of(), SerializableCoder.of(DirectoryMetadata.class));
            private BucketMetadata.BucketMetadataCoder<K, V> canonicalMetadataCoder;

            private BucketedInputCoder() {
                this.canonicalMetadataCoder = new BucketMetadata.BucketMetadataCoder<>();
            }

            public void encode(BucketedInput<K, V> bucketedInput, OutputStream outputStream) throws IOException {
                tupleTagCoder.encode(((BucketedInput) bucketedInput).tupleTag, outputStream);
                fileSuffixCoder.encode(((BucketedInput) bucketedInput).filenameSuffix, outputStream);
                fileOperationsCoder.encode(((BucketedInput) bucketedInput).fileOperations, outputStream);
                inputMetadataCoder.encode(((BucketedInput) bucketedInput).inputMetadata, outputStream);
                this.canonicalMetadataCoder.encode((BucketMetadata) ((BucketedInput) bucketedInput).canonicalMetadata, outputStream);
            }

            /* renamed from: decode, reason: merged with bridge method [inline-methods] */
            public BucketedInput<K, V> m21decode(InputStream inputStream) throws IOException {
                return new BucketedInput<>(tupleTagCoder.decode(inputStream), fileSuffixCoder.decode(inputStream), (FileOperations) fileOperationsCoder.decode(inputStream), inputMetadataCoder.decode(inputStream), this.canonicalMetadataCoder.m10decode(inputStream));
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSource$BucketedInput$DirectoryMetadata.class */
        public static class DirectoryMetadata implements Serializable {
            final SMBFilenamePolicy.FileAssignment fileAssignment;
            final int numBuckets;
            final int numShards;

            DirectoryMetadata(SMBFilenamePolicy.FileAssignment fileAssignment, BucketMetadata<?, ?> bucketMetadata) {
                this.fileAssignment = fileAssignment;
                this.numBuckets = bucketMetadata.getNumBuckets();
                this.numShards = bucketMetadata.getNumShards();
            }
        }

        public BucketedInput(TupleTag<V> tupleTag, ResourceId resourceId, String str, FileOperations<V> fileOperations) {
            this(tupleTag, (List<ResourceId>) Collections.singletonList(resourceId), str, fileOperations);
        }

        public BucketedInput(TupleTag<V> tupleTag, List<ResourceId> list, String str, FileOperations<V> fileOperations) {
            this.tupleTag = tupleTag;
            this.filenameSuffix = str;
            this.fileOperations = fileOperations;
            this.inputDirectories = list;
        }

        private BucketedInput(TupleTag<V> tupleTag, String str, FileOperations<V> fileOperations, Map<ResourceId, DirectoryMetadata> map, BucketMetadata<K, V> bucketMetadata) {
            this.tupleTag = tupleTag;
            this.filenameSuffix = str;
            this.fileOperations = fileOperations;
            this.inputDirectories = new ArrayList(map.keySet());
            this.inputMetadata = map;
            this.canonicalMetadata = bucketMetadata;
        }

        public TupleTag<V> getTupleTag() {
            return this.tupleTag;
        }

        public Coder<V> getCoder() {
            return this.fileOperations.getCoder();
        }

        public BucketMetadata<K, V> getMetadata() {
            computeMetadata();
            return this.canonicalMetadata;
        }

        private void computeMetadata() {
            if (this.inputMetadata == null || this.canonicalMetadata == null) {
                this.inputMetadata = new HashMap();
                this.canonicalMetadata = null;
                ResourceId resourceId = null;
                for (ResourceId resourceId2 : this.inputDirectories) {
                    SMBFilenamePolicy.FileAssignment forDestination = new SMBFilenamePolicy(resourceId2, this.filenameSuffix).forDestination();
                    try {
                        BucketMetadata<K, V> from = BucketMetadata.from(Channels.newInputStream(FileSystems.open(forDestination.forMetadata())));
                        if (this.canonicalMetadata == null || from.getNumBuckets() < this.canonicalMetadata.getNumBuckets()) {
                            this.canonicalMetadata = from;
                            resourceId = resourceId2;
                        }
                        Preconditions.checkState(from.isCompatibleWith(this.canonicalMetadata) && from.isPartitionCompatible(this.canonicalMetadata), "%s cannot be read as a single input source. Metadata in directory %s is incompatible with metadata in directory %s. %s != %s", new Object[]{this, resourceId2, resourceId, from, this.canonicalMetadata});
                        this.inputMetadata.put(resourceId2, new DirectoryMetadata(forDestination, from));
                    } catch (IOException e) {
                        throw new RuntimeException("Error fetching metadata for " + resourceId2, e);
                    }
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public KeyGroupIterator<byte[], V> createIterator(int i, int i2) {
            ArrayList arrayList = new ArrayList();
            this.inputMetadata.forEach((resourceId, directoryMetadata) -> {
                int i3 = directoryMetadata.numBuckets;
                int i4 = directoryMetadata.numShards;
                int i5 = i;
                while (true) {
                    int i6 = i5;
                    if (i6 >= i3) {
                        return;
                    }
                    for (int i7 = 0; i7 < i4; i7++) {
                        try {
                            arrayList.add(this.fileOperations.iterator(directoryMetadata.fileAssignment.forBucket(BucketShardId.of(i6, i7), i3, i4)));
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    }
                    i5 = i6 + i2;
                }
            });
            BucketMetadata<K, V> bucketMetadata = this.canonicalMetadata;
            bucketMetadata.getClass();
            Function function = bucketMetadata::getKeyBytes;
            return new KeyGroupIterator<>(Iterators.mergeSorted(arrayList, (obj, obj2) -> {
                return SortedBucketSource.bytesComparator.compare(function.apply(obj), function.apply(obj2));
            }), function, SortedBucketSource.bytesComparator);
        }

        public String toString() {
            Object[] objArr = new Object[3];
            objArr[0] = this.tupleTag.getId();
            objArr[1] = this.inputDirectories.size() > 5 ? this.inputDirectories.subList(0, 4) + "..." + this.inputDirectories.get(this.inputDirectories.size() - 1) : this.inputDirectories;
            objArr[2] = this.canonicalMetadata;
            return String.format("BucketedInput[tupleTag=%s, inputDirectories=[%s], metadata=%s]", objArr);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSource$BucketedInputs.class */
    public static class BucketedInputs implements PInput {
        private final transient Pipeline pipeline;
        private final List<BucketedInput<?, ?>> sources;
        private final Integer numBuckets;

        private BucketedInputs(Pipeline pipeline, Integer num, List<BucketedInput<?, ?>> list) {
            this.pipeline = pipeline;
            this.numBuckets = num;
            this.sources = list;
        }

        public Pipeline getPipeline() {
            return this.pipeline;
        }

        public Map<TupleTag<?>, PValue> expand() {
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < this.numBuckets.intValue(); i++) {
                try {
                    arrayList.add(KV.of(Integer.valueOf(i), this.sources));
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            return Collections.singletonMap(new TupleTag("BucketedSources"), this.pipeline.apply("CreateBucketedSources", Create.of(arrayList).withCoder(KvCoder.of(VarIntCoder.of(), ListCoder.of(new BucketedInput.BucketedInputCoder())))));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSource$MergeBuckets.class */
    public static class MergeBuckets<FinalKeyT> extends DoFn<KV<Integer, List<BucketedInput<?, ?>>>, KV<FinalKeyT, CoGbkResult>> {
        private static final Comparator<Map.Entry<TupleTag, KV<byte[], Iterator<?>>>> keyComparator = (entry, entry2) -> {
            return SortedBucketSource.bytesComparator.compare(((KV) entry.getValue()).getKey(), ((KV) entry2.getValue()).getKey());
        };
        private final Integer leastNumBuckets;
        private final Coder<FinalKeyT> keyCoder;

        MergeBuckets(int i, Coder<FinalKeyT> coder) {
            this.leastNumBuckets = Integer.valueOf(i);
            this.keyCoder = coder;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<Integer, List<BucketedInput<?, ?>>>, KV<FinalKeyT, CoGbkResult>>.ProcessContext processContext) {
            int i;
            int intValue = ((Integer) ((KV) processContext.element()).getKey()).intValue();
            List list = (List) ((KV) processContext.element()).getValue();
            int size = list.size();
            KeyGroupIterator[] keyGroupIteratorArr = (KeyGroupIterator[]) list.stream().map(bucketedInput -> {
                return bucketedInput.createIterator(intValue, this.leastNumBuckets.intValue());
            }).toArray(i2 -> {
                return new KeyGroupIterator[i2];
            });
            List list2 = (List) list.stream().map((v0) -> {
                return v0.getTupleTag();
            }).collect(Collectors.toList());
            CoGbkResultSchema of = CoGbkResultSchema.of(list2);
            HashMap hashMap = new HashMap();
            do {
                i = 0;
                for (int i3 = 0; i3 < size; i3++) {
                    KeyGroupIterator keyGroupIterator = keyGroupIteratorArr[i3];
                    if (!hashMap.containsKey(list2.get(i3))) {
                        if (keyGroupIterator.hasNext()) {
                            hashMap.put(list2.get(i3), keyGroupIterator.next());
                        } else {
                            i++;
                        }
                    }
                }
                if (hashMap.isEmpty()) {
                    return;
                }
                Map.Entry<TupleTag, KV<byte[], Iterator<?>>> entry = (Map.Entry) hashMap.entrySet().stream().min(keyComparator).orElse(null);
                Iterator it = hashMap.entrySet().iterator();
                ArrayList arrayList = new ArrayList();
                for (int i4 = 0; i4 < of.size(); i4++) {
                    arrayList.add(new ArrayList());
                }
                while (it.hasNext()) {
                    Map.Entry<TupleTag, KV<byte[], Iterator<?>>> entry2 = (Map.Entry) it.next();
                    if (keyComparator.compare(entry2, entry) == 0) {
                        List list3 = (List) arrayList.get(of.getIndex(entry2.getKey()));
                        Iterator it2 = (Iterator) entry2.getValue().getValue();
                        list3.getClass();
                        it2.forEachRemaining(list3::add);
                        it.remove();
                    }
                }
                try {
                    processContext.output(KV.of(this.keyCoder.decode(new ByteArrayInputStream((byte[]) entry.getValue().getKey())), CoGbkResultUtil.newCoGbkResult(of, arrayList)));
                } catch (Exception e) {
                    throw new RuntimeException("Could not decode key bytes for group", e);
                }
            } while (i != size);
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("keyCoder", this.keyCoder.getClass()));
            builder.add(DisplayData.item("leastNumBuckets", this.leastNumBuckets));
        }
    }

    public SortedBucketSource(Class<FinalKeyT> cls, List<BucketedInput<?, ?>> list) {
        this.finalKeyClass = cls;
        this.sources = list;
    }

    public final PCollection<KV<FinalKeyT, CoGbkResult>> expand(PBegin pBegin) {
        BucketMetadata<?, ?> bucketMetadata = null;
        Coder<?> coder = null;
        int i = Integer.MAX_VALUE;
        for (BucketedInput<?, ?> bucketedInput : this.sources) {
            BucketMetadata<?, ?> metadata = bucketedInput.getMetadata();
            if (bucketMetadata == null) {
                bucketMetadata = metadata;
            } else {
                Preconditions.checkState(bucketMetadata.isCompatibleWith(metadata), "Source %s is incompatible with source %s", this.sources.get(0), bucketedInput);
            }
            i = Math.min(metadata.getNumBuckets(), i);
            if (metadata.getKeyClass() == this.finalKeyClass && coder == null) {
                try {
                    coder = metadata.getKeyCoder();
                } catch (CannotProvideCoderException e) {
                    throw new RuntimeException("Could not provide coder for key class " + this.finalKeyClass, e);
                } catch (Coder.NonDeterministicException e2) {
                    throw new RuntimeException("Non-deterministic coder for key class " + this.finalKeyClass, e2);
                }
            }
        }
        Preconditions.checkNotNull(coder, "Could not infer coder for key class %s", this.finalKeyClass);
        return new BucketedInputs(pBegin.getPipeline(), Integer.valueOf(i), this.sources).expand().get(new TupleTag("BucketedSources")).apply("ReshuffleKeys", Reshuffle.viaRandomKey()).apply("MergeBuckets", ParDo.of(new MergeBuckets(i, coder))).setCoder(KvCoder.of(coder, CoGbkResult.CoGbkResultCoder.of(CoGbkResultSchema.of((List) this.sources.stream().map((v0) -> {
            return v0.getTupleTag();
        }).collect(Collectors.toList())), UnionCoder.of((List) this.sources.stream().map((v0) -> {
            return v0.getCoder();
        }).collect(Collectors.toList())))));
    }
}
