package org.apache.beam.sdk.extensions.smb;

import java.io.IOException;
import java.io.OutputStream;
import java.lang.invoke.SerializedLambda;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.smb.BucketShardId;
import org.apache.beam.sdk.extensions.smb.FileOperations;
import org.apache.beam.sdk.extensions.smb.SMBFilenamePolicy;
import org.apache.beam.sdk.extensions.sorter.BufferedExternalSorter;
import org.apache.beam.sdk.extensions.sorter.ExternalSorter;
import org.apache.beam.sdk.extensions.sorter.SortValues;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.schemas.transforms.Group;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSink.class */
public class SortedBucketSink<K, V> extends PTransform<PCollection<V>, WriteResult> {
    private static final Logger LOG = LoggerFactory.getLogger(SortedBucketSink.class);
    private final BucketMetadata<K, V> bucketMetadata;
    private final SMBFilenamePolicy filenamePolicy;
    private final ResourceId tempDirectory;
    private final FileOperations<V> fileOperations;
    private final int sorterMemoryMb;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSink$ExtractKeys.class */
    public static class ExtractKeys<K, V> extends DoFn<V, KV<BucketShardId, KV<byte[], V>>> {
        private static final byte[] NULL_SORT_KEY = new byte[0];
        private final BucketMetadata<K, V> bucketMetadata;
        private transient int shardId;

        ExtractKeys(BucketMetadata<K, V> bucketMetadata) {
            this.bucketMetadata = bucketMetadata;
        }

        @DoFn.StartBundle
        public void startBundle() {
            this.shardId = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE) % this.bucketMetadata.getNumShards();
        }

        /* JADX WARN: Multi-variable type inference failed */
        @DoFn.ProcessElement
        public void processElement(DoFn<V, KV<BucketShardId, KV<byte[], V>>>.ProcessContext processContext) {
            Object element = processContext.element();
            byte[] keyBytes = this.bucketMetadata.getKeyBytes(element);
            processContext.output(KV.of(keyBytes != null ? BucketShardId.of(this.bucketMetadata.getBucketId(keyBytes), this.shardId) : BucketShardId.ofNullKey(this.shardId), KV.of(keyBytes != null ? keyBytes : NULL_SORT_KEY, element)));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.delegate(this.bucketMetadata);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSink$FinalizeTempFiles.class */
    public static class FinalizeTempFiles<V> extends PTransform<PCollectionTuple, WriteResult> {
        private final SMBFilenamePolicy.FileAssignment fileAssignment;
        private final BucketMetadata bucketMetadata;
        private final FileOperations<V> fileOperations;

        /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSink$FinalizeTempFiles$RenameBuckets.class */
        private static class RenameBuckets<V> extends DoFn<Iterable<KV<BucketShardId, ResourceId>>, KV<BucketShardId, ResourceId>> {
            private final SMBFilenamePolicy.FileAssignment fileAssignment;
            private final BucketMetadata bucketMetadata;
            private final FileOperations<V> fileOperations;

            RenameBuckets(SMBFilenamePolicy.FileAssignment fileAssignment, BucketMetadata bucketMetadata, FileOperations<V> fileOperations) {
                this.fileAssignment = fileAssignment;
                this.bucketMetadata = bucketMetadata;
                this.fileOperations = fileOperations;
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<Iterable<KV<BucketShardId, ResourceId>>, KV<BucketShardId, ResourceId>>.ProcessContext processContext) {
                HashSet<BucketShardId> hashSet = new HashSet();
                for (int i = 0; i < this.bucketMetadata.getNumBuckets(); i++) {
                    for (int i2 = 0; i2 < this.bucketMetadata.getNumShards(); i2++) {
                        hashSet.add(BucketShardId.of(i, i2));
                    }
                }
                Iterable<KV> iterable = (Iterable) processContext.element();
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                ArrayList arrayList3 = new ArrayList();
                for (KV kv : iterable) {
                    BucketShardId bucketShardId = (BucketShardId) kv.getKey();
                    hashSet.remove(bucketShardId);
                    ResourceId forBucket = this.fileAssignment.forBucket(bucketShardId, this.bucketMetadata);
                    arrayList.add(kv.getValue());
                    arrayList2.add(forBucket);
                    arrayList3.add(KV.of(bucketShardId, forBucket));
                }
                ArrayList arrayList4 = new ArrayList(arrayList);
                for (BucketShardId bucketShardId2 : hashSet) {
                    ResourceId forBucket2 = this.fileAssignment.forBucket(bucketShardId2, this.bucketMetadata);
                    arrayList4.add(forBucket2);
                    try {
                        this.fileOperations.createWriter(forBucket2).close();
                        processContext.output(KV.of(bucketShardId2, forBucket2));
                    } catch (IOException e) {
                        SortedBucketSink.cleanupTempFiles(e, arrayList4);
                        throw new RuntimeException("Failed to write empty file", e);
                    }
                }
                SortedBucketSink.LOG.info("Renaming bucket files");
                try {
                    FileSystems.rename(arrayList, arrayList2, new MoveOptions[0]);
                    processContext.getClass();
                    arrayList3.forEach((v1) -> {
                        r1.output(v1);
                    });
                } catch (IOException e2) {
                    SortedBucketSink.cleanupTempFiles(e2, arrayList4);
                    throw new RuntimeException("Failed to rename temporary files", e2);
                }
            }

            public void populateDisplayData(DisplayData.Builder builder) {
                super.populateDisplayData(builder);
                builder.delegate(this.fileAssignment);
            }
        }

        FinalizeTempFiles(SMBFilenamePolicy.FileAssignment fileAssignment, BucketMetadata bucketMetadata, FileOperations<V> fileOperations) {
            this.fileAssignment = fileAssignment;
            this.bucketMetadata = bucketMetadata;
            this.fileOperations = fileOperations;
        }

        public WriteResult expand(PCollectionTuple pCollectionTuple) {
            return (WriteResult) pCollectionTuple.apply("MoveToFinalDestinations", PTransform.compose(pCollectionTuple2 -> {
                return new WriteResult(pCollectionTuple.getPipeline(), pCollectionTuple2.get(new TupleTag("TempMetadata")).apply("RenameMetadata", ParDo.of(new DoFn<ResourceId, ResourceId>() { // from class: org.apache.beam.sdk.extensions.smb.SortedBucketSink.FinalizeTempFiles.1
                    @DoFn.ProcessElement
                    public void processElement(DoFn<ResourceId, ResourceId>.ProcessContext processContext) throws IOException {
                        ResourceId forMetadata = FinalizeTempFiles.this.fileAssignment.forMetadata();
                        SortedBucketSink.LOG.info("Renaming metadata file");
                        FileSystems.rename(ImmutableList.of(processContext.element()), ImmutableList.of(forMetadata), new MoveOptions[0]);
                        processContext.output(forMetadata);
                    }

                    public void populateDisplayData(DisplayData.Builder builder) {
                        super.populateDisplayData(builder);
                        builder.add(DisplayData.item("Metadata Location", FinalizeTempFiles.this.fileAssignment.forMetadata().toString()));
                    }
                })), pCollectionTuple2.get(new TupleTag("TempBuckets")).apply("GroupAll", Group.globally()).apply("RenameBuckets", ParDo.of(new RenameBuckets(this.fileAssignment, this.bucketMetadata, this.fileOperations))));
            }));
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1097614790:
                    if (implMethodName.equals("lambda$expand$eb100f5c$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case BucketMetadata.CURRENT_VERSION /* 0 */:
                    if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/extensions/smb/SortedBucketSink$FinalizeTempFiles") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/PCollectionTuple;Lorg/apache/beam/sdk/values/PCollectionTuple;)Lorg/apache/beam/sdk/extensions/smb/SortedBucketSink$WriteResult;")) {
                        FinalizeTempFiles finalizeTempFiles = (FinalizeTempFiles) serializedLambda.getCapturedArg(0);
                        PCollectionTuple pCollectionTuple = (PCollectionTuple) serializedLambda.getCapturedArg(1);
                        return pCollectionTuple2 -> {
                            return new WriteResult(pCollectionTuple.getPipeline(), pCollectionTuple2.get(new TupleTag("TempMetadata")).apply("RenameMetadata", ParDo.of(new DoFn<ResourceId, ResourceId>() { // from class: org.apache.beam.sdk.extensions.smb.SortedBucketSink.FinalizeTempFiles.1
                                @DoFn.ProcessElement
                                public void processElement(DoFn<ResourceId, ResourceId>.ProcessContext processContext) throws IOException {
                                    ResourceId forMetadata = FinalizeTempFiles.this.fileAssignment.forMetadata();
                                    SortedBucketSink.LOG.info("Renaming metadata file");
                                    FileSystems.rename(ImmutableList.of(processContext.element()), ImmutableList.of(forMetadata), new MoveOptions[0]);
                                    processContext.output(forMetadata);
                                }

                                public void populateDisplayData(DisplayData.Builder builder) {
                                    super.populateDisplayData(builder);
                                    builder.add(DisplayData.item("Metadata Location", FinalizeTempFiles.this.fileAssignment.forMetadata().toString()));
                                }
                            })), pCollectionTuple2.get(new TupleTag("TempBuckets")).apply("GroupAll", Group.globally()).apply("RenameBuckets", ParDo.of(new RenameBuckets(this.fileAssignment, this.bucketMetadata, this.fileOperations))));
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSink$WriteOperation.class */
    public static class WriteOperation<V> extends PTransform<PCollection<KV<BucketShardId, Iterable<KV<byte[], V>>>>, WriteResult> {
        private final SMBFilenamePolicy filenamePolicy;
        private final BucketMetadata<?, V> bucketMetadata;
        private final FileOperations<V> fileOperations;
        private final ResourceId tempDirectory;

        WriteOperation(SMBFilenamePolicy sMBFilenamePolicy, BucketMetadata<?, V> bucketMetadata, FileOperations<V> fileOperations, ResourceId resourceId) {
            this.filenamePolicy = sMBFilenamePolicy;
            this.bucketMetadata = bucketMetadata;
            this.fileOperations = fileOperations;
            this.tempDirectory = resourceId;
        }

        public WriteResult expand(PCollection<KV<BucketShardId, Iterable<KV<byte[], V>>>> pCollection) {
            return (WriteResult) pCollection.apply("WriteTempFiles", new WriteTempFiles(this.filenamePolicy.forTempFiles(this.tempDirectory), this.bucketMetadata, this.fileOperations)).apply("FinalizeTempFiles", new FinalizeTempFiles(this.filenamePolicy.forDestination(), this.bucketMetadata, this.fileOperations));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSink$WriteResult.class */
    public static class WriteResult implements POutput {
        private final Pipeline pipeline;
        private final PCollection<ResourceId> writtenMetadata;
        private final PCollection<KV<BucketShardId, ResourceId>> writtenFiles;

        WriteResult(Pipeline pipeline, PCollection<ResourceId> pCollection, PCollection<KV<BucketShardId, ResourceId>> pCollection2) {
            this.pipeline = pipeline;
            this.writtenMetadata = pCollection;
            this.writtenFiles = pCollection2;
        }

        public Pipeline getPipeline() {
            return this.pipeline;
        }

        public Map<TupleTag<?>, PValue> expand() {
            return ImmutableMap.of(new TupleTag("WrittenMetadata"), this.writtenMetadata, new TupleTag("WrittenFiles"), this.writtenFiles);
        }

        public void finishSpecifyingOutput(String str, PInput pInput, PTransform<?, ?> pTransform) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSink$WriteTempFiles.class */
    public static class WriteTempFiles<V> extends PTransform<PCollection<KV<BucketShardId, Iterable<KV<byte[], V>>>>, PCollectionTuple> {
        private final SMBFilenamePolicy.FileAssignment fileAssignment;
        private final BucketMetadata bucketMetadata;
        private final FileOperations<V> fileOperations;

        WriteTempFiles(SMBFilenamePolicy.FileAssignment fileAssignment, BucketMetadata bucketMetadata, FileOperations<V> fileOperations) {
            this.fileAssignment = fileAssignment;
            this.bucketMetadata = bucketMetadata;
            this.fileOperations = fileOperations;
        }

        public PCollectionTuple expand(PCollection<KV<BucketShardId, Iterable<KV<byte[], V>>>> pCollection) {
            return PCollectionTuple.of(new TupleTag("TempMetadata"), pCollection.getPipeline().apply("WriteTempMetadata", Create.of(Collections.singletonList(writeMetadataFile())))).and(new TupleTag("TempBuckets"), pCollection.apply("WriteTempBuckets", ParDo.of(new DoFn<KV<BucketShardId, Iterable<KV<byte[], V>>>, KV<BucketShardId, ResourceId>>() { // from class: org.apache.beam.sdk.extensions.smb.SortedBucketSink.WriteTempFiles.1
                @DoFn.ProcessElement
                public void processElement(DoFn<KV<BucketShardId, Iterable<KV<byte[], V>>>, KV<BucketShardId, ResourceId>>.ProcessContext processContext) throws IOException {
                    BucketShardId bucketShardId = (BucketShardId) ((KV) processContext.element()).getKey();
                    Iterable iterable = (Iterable) ((KV) processContext.element()).getValue();
                    ResourceId forBucket = WriteTempFiles.this.fileAssignment.forBucket(bucketShardId, WriteTempFiles.this.bucketMetadata);
                    SortedBucketSink.LOG.info("Writing sorted-bucket {} to temporary file {}", bucketShardId, forBucket);
                    FileOperations.Writer<V> createWriter = WriteTempFiles.this.fileOperations.createWriter(forBucket);
                    Throwable th = null;
                    try {
                        try {
                            iterable.forEach(kv -> {
                                try {
                                    createWriter.write(kv.getValue());
                                } catch (IOException e) {
                                    SortedBucketSink.cleanupTempFiles(e, Collections.singleton(forBucket));
                                    throw new RuntimeException("Failed to write sorted-bucket file", e);
                                }
                            });
                            if (createWriter != null) {
                                if (0 != 0) {
                                    try {
                                        createWriter.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    createWriter.close();
                                }
                            }
                            processContext.output(KV.of(bucketShardId, forBucket));
                        } finally {
                        }
                    } catch (Throwable th3) {
                        if (createWriter != null) {
                            if (th != null) {
                                try {
                                    createWriter.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                createWriter.close();
                            }
                        }
                        throw th3;
                    }
                }

                public void populateDisplayData(DisplayData.Builder builder) {
                    super.populateDisplayData(builder);
                    builder.delegate(WriteTempFiles.this.fileAssignment);
                    builder.delegate(WriteTempFiles.this.fileOperations);
                }
            })));
        }

        private ResourceId writeMetadataFile() {
            ResourceId forMetadata = this.fileAssignment.forMetadata();
            SortedBucketSink.LOG.info("Writing metadata to temporary file {}", forMetadata);
            try {
                OutputStream newOutputStream = Channels.newOutputStream(FileSystems.create(forMetadata, "application/json"));
                Throwable th = null;
                try {
                    BucketMetadata.to(this.bucketMetadata, newOutputStream);
                    if (newOutputStream != null) {
                        if (0 != 0) {
                            try {
                                newOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            newOutputStream.close();
                        }
                    }
                    return forMetadata;
                } finally {
                }
            } catch (IOException e) {
                SortedBucketSink.cleanupTempFiles(e, Collections.singleton(forMetadata));
                throw new RuntimeException("Failed to write metadata file", e);
            }
        }
    }

    public SortedBucketSink(BucketMetadata<K, V> bucketMetadata, ResourceId resourceId, ResourceId resourceId2, String str, FileOperations<V> fileOperations, int i) {
        this.bucketMetadata = bucketMetadata;
        this.filenamePolicy = new SMBFilenamePolicy(resourceId, str);
        this.tempDirectory = resourceId2;
        this.fileOperations = fileOperations;
        this.sorterMemoryMb = i;
    }

    public final WriteResult expand(PCollection<V> pCollection) {
        Preconditions.checkArgument(pCollection.isBounded() == PCollection.IsBounded.BOUNDED, "SortedBucketSink cannot be applied to a non-bounded PCollection");
        return (WriteResult) pCollection.apply("ExtractKeys", ParDo.of(new ExtractKeys(this.bucketMetadata))).setCoder(KvCoder.of(BucketShardId.BucketShardIdCoder.of(), KvCoder.of(ByteArrayCoder.of(), pCollection.getCoder()))).apply("GroupByKey", GroupByKey.create()).apply("SortValues", SortValues.create(BufferedExternalSorter.options().withExternalSorterType(ExternalSorter.Options.SorterType.NATIVE).withMemoryMB(this.sorterMemoryMb))).apply("WriteOperation", new WriteOperation(this.filenamePolicy, this.bucketMetadata, this.fileOperations, this.tempDirectory));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void cleanupTempFiles(Exception exc, Collection<ResourceId> collection) {
        LOG.info("Deleting temporary file {}", collection.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(", ")));
        try {
            FileSystems.delete(collection, new MoveOptions[]{MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES});
        } catch (IOException e) {
            exc.addSuppressed(e);
        }
    }
}
