package org.apache.beam.sdk.extensions.smb;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.spotify.scio.transforms.DoFnWithResource;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.invoke.SerializedLambda;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.DelegateCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.extensions.smb.BucketShardId;
import org.apache.beam.sdk.extensions.smb.FileOperations;
import org.apache.beam.sdk.extensions.smb.SMBFilenamePolicy;
import org.apache.beam.sdk.extensions.sorter.BufferedExternalSorter;
import org.apache.beam.sdk.extensions.sorter.ExternalSorter;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.fs.ResourceIdCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sample;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSink.class */
public class SortedBucketSink<K, V> extends PTransform<PCollection<V>, WriteResult> {
    private static final Logger LOG = LoggerFactory.getLogger(SortedBucketSink.class);
    private final BucketMetadata<K, V> bucketMetadata;
    private final SMBFilenamePolicy filenamePolicy;
    private final ResourceId tempDirectory;
    private final FileOperations<V> fileOperations;
    private final int sorterMemoryMb;
    private final int keyCacheSize;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSink$ExtractKeys.class */
    public static class ExtractKeys<K, V> extends DoFn<V, KV<BucketShardId, KV<byte[], byte[]>>> {
        static final byte[] NULL_SORT_KEY = new byte[0];
        private final SerializableFunction<V, K> extractKeyFn;
        private final Coder<V> valueCoder;
        private final BucketMetadata<K, ?> bucketMetadata;
        private transient int shardId;

        static <KeyT, ValueT> DoFn<ValueT, KV<BucketShardId, KV<byte[], byte[]>>> of(BucketMetadata<KeyT, ?> bucketMetadata, SerializableFunction<ValueT, KeyT> serializableFunction, Coder<ValueT> coder, int i) {
            return i == 0 ? new ExtractKeys(bucketMetadata, serializableFunction, coder) : new ExtractKeysWithCache(bucketMetadata, serializableFunction, coder, i);
        }

        private ExtractKeys(BucketMetadata<K, ?> bucketMetadata, SerializableFunction<V, K> serializableFunction, Coder<V> coder) {
            this.bucketMetadata = bucketMetadata;
            this.extractKeyFn = serializableFunction;
            this.valueCoder = coder;
        }

        @DoFn.StartBundle
        public void startBundle() {
            this.shardId = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE) % this.bucketMetadata.getNumShards();
        }

        static <KeyT> KV<BucketShardId, byte[]> processKey(KeyT keyt, BucketMetadata<KeyT, ?> bucketMetadata, int i) {
            byte[] encodeKeyBytes = bucketMetadata.encodeKeyBytes(keyt);
            return encodeKeyBytes != null ? KV.of(BucketShardId.of(bucketMetadata.getBucketId(encodeKeyBytes), i), encodeKeyBytes) : KV.of(BucketShardId.ofNullKey(), NULL_SORT_KEY);
        }

        static <ValueT> byte[] getValueBytes(Coder<ValueT> coder, ValueT valuet) {
            try {
                return CoderUtils.encodeToByteArray(coder, valuet);
            } catch (CoderException e) {
                throw new RuntimeException((Throwable) e);
            }
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<V, KV<BucketShardId, KV<byte[], byte[]>>>.ProcessContext processContext) {
            Object element = processContext.element();
            KV<BucketShardId, byte[]> processKey = processKey(this.extractKeyFn.apply(element), this.bucketMetadata, this.shardId);
            processContext.output(KV.of((BucketShardId) processKey.getKey(), KV.of((byte[]) processKey.getValue(), getValueBytes(this.valueCoder, element))));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.delegate(this.bucketMetadata);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSink$ExtractKeysWithCache.class */
    public static class ExtractKeysWithCache<K, V> extends DoFnWithResource<V, KV<BucketShardId, KV<byte[], byte[]>>, Cache<K, KV<Integer, byte[]>>> {
        private final SerializableFunction<V, K> extractKeyFn;
        private final Coder<V> valueCoder;
        private final BucketMetadata<K, ?> bucketMetadata;
        private final int cacheSize;
        private transient int shardId;
        private Counter cacheHits = Metrics.counter(SortedBucketSink.class, "cacheHits");
        private Counter cacheMisses = Metrics.counter(SortedBucketSink.class, "cacheMisses");

        ExtractKeysWithCache(BucketMetadata<K, ?> bucketMetadata, SerializableFunction<V, K> serializableFunction, Coder<V> coder, int i) {
            this.bucketMetadata = bucketMetadata;
            this.extractKeyFn = serializableFunction;
            this.valueCoder = coder;
            this.cacheSize = i;
        }

        public DoFnWithResource.ResourceType getResourceType() {
            return DoFnWithResource.ResourceType.PER_INSTANCE;
        }

        /* renamed from: createResource, reason: merged with bridge method [inline-methods] */
        public Cache<K, KV<Integer, byte[]>> m32createResource() {
            return Caffeine.newBuilder().maximumSize(this.cacheSize).build();
        }

        @DoFn.StartBundle
        public void startBundle() {
            this.shardId = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE) % this.bucketMetadata.getNumShards();
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<V, KV<BucketShardId, KV<byte[], byte[]>>>.ProcessContext processContext) {
            Object element = processContext.element();
            Object apply = this.extractKeyFn.apply(element);
            KV of = apply == null ? KV.of(BucketShardId.ofNullKey(), ExtractKeys.NULL_SORT_KEY) : (KV) Optional.ofNullable((KV) ((Cache) getResource()).getIfPresent(apply)).map(kv -> {
                this.cacheHits.inc();
                return KV.of(BucketShardId.of(((Integer) kv.getKey()).intValue(), this.shardId), (byte[]) kv.getValue());
            }).orElseGet(() -> {
                this.cacheMisses.inc();
                KV<BucketShardId, byte[]> processKey = ExtractKeys.processKey(apply, this.bucketMetadata, this.shardId);
                ((Cache) getResource()).put(apply, KV.of(Integer.valueOf(((BucketShardId) processKey.getKey()).getBucketId()), (byte[]) processKey.getValue()));
                return processKey;
            });
            processContext.output(KV.of((BucketShardId) of.getKey(), KV.of((byte[]) of.getValue(), ExtractKeys.getValueBytes(this.valueCoder, element))));
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.delegate(this.bucketMetadata);
            builder.add(DisplayData.item("keyCacheSize", Integer.valueOf(this.cacheSize)));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSink$RenameBuckets.class */
    public static class RenameBuckets<V> extends PTransform<PCollection<KV<BucketShardId, ResourceId>>, PCollectionTuple> {
        private final ResourceId tempDirectory;
        private final SMBFilenamePolicy.FileAssignment fileAssignment;
        private final BucketMetadata bucketMetadata;
        private final FileOperations<V> fileOperations;

        RenameBuckets(ResourceId resourceId, SMBFilenamePolicy.FileAssignment fileAssignment, BucketMetadata bucketMetadata, FileOperations<V> fileOperations) {
            this.tempDirectory = resourceId;
            this.fileAssignment = fileAssignment;
            this.bucketMetadata = bucketMetadata;
            this.fileOperations = fileOperations;
        }

        public PCollectionTuple expand(PCollection<KV<BucketShardId, ResourceId>> pCollection) {
            final PCollectionView apply = pCollection.apply("WrittenBucketShardIds", View.asMap());
            final TupleTag tupleTag = new TupleTag("writtenBuckets");
            final TupleTag tupleTag2 = new TupleTag("writtenMetadata");
            return pCollection.getPipeline().apply("InitializeTmpMove", Create.of(0, new Integer[0])).apply("PopulateFinalDst", ParDo.of(new DoFn<Integer, KV<BucketShardId, ResourceId>>() { // from class: org.apache.beam.sdk.extensions.smb.SortedBucketSink.RenameBuckets.1
                @DoFn.ProcessElement
                public void processElement(DoFn<Integer, KV<BucketShardId, ResourceId>>.ProcessContext processContext) throws IOException {
                    ResourceId resourceId = RenameBuckets.this.tempDirectory;
                    BucketMetadata bucketMetadata = RenameBuckets.this.bucketMetadata;
                    Map map = (Map) processContext.sideInput(apply);
                    SMBFilenamePolicy.FileAssignment fileAssignment = RenameBuckets.this.fileAssignment;
                    FileOperations fileOperations = RenameBuckets.this.fileOperations;
                    TupleTag tupleTag3 = tupleTag;
                    Consumer consumer = kv -> {
                        processContext.output(tupleTag3, kv);
                    };
                    TupleTag tupleTag4 = tupleTag2;
                    RenameBuckets.moveFiles(resourceId, bucketMetadata, map, fileAssignment, fileOperations, consumer, resourceId2 -> {
                        processContext.output(tupleTag4, resourceId2);
                    }, true);
                }
            }).withSideInputs(new PCollectionView[]{apply}).withOutputTags(tupleTag, TupleTagList.of(tupleTag2)));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static void moveFiles(ResourceId resourceId, BucketMetadata<?, ?> bucketMetadata, Map<BucketShardId, ResourceId> map, SMBFilenamePolicy.FileAssignment fileAssignment, FileOperations fileOperations, Consumer<KV<BucketShardId, ResourceId>> consumer, Consumer<ResourceId> consumer2, boolean z) throws IOException {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            Set<BucketShardId> allBucketShardIds = bucketMetadata.getAllBucketShardIds();
            if (z) {
                allBucketShardIds.add(BucketShardId.ofNullKey());
            }
            for (BucketShardId bucketShardId : allBucketShardIds) {
                ResourceId forBucket = fileAssignment.forBucket(bucketShardId, bucketMetadata);
                if (map.containsKey(bucketShardId)) {
                    arrayList.add(map.get(bucketShardId));
                    arrayList2.add(forBucket);
                    arrayList3.add(KV.of(bucketShardId, forBucket));
                } else {
                    fileOperations.createWriter(forBucket).close();
                    consumer.accept(KV.of(bucketShardId, forBucket));
                }
            }
            SortedBucketSink.LOG.info("Moving {} bucket files into {}", Integer.valueOf(arrayList.size()), fileAssignment.getDirectory());
            FileSystems.rename(arrayList, arrayList2, new MoveOptions[]{MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES, MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS});
            Objects.requireNonNull(consumer);
            arrayList3.forEach((v1) -> {
                r1.accept(v1);
            });
            consumer2.accept(writeMetadataFile(fileAssignment.forMetadata(), bucketMetadata));
            ArrayList arrayList4 = new ArrayList();
            for (MatchResult matchResult : FileSystems.match(Collections.singletonList(resourceId.toString() + "*"))) {
                if (matchResult.status() == MatchResult.Status.OK) {
                    Iterator it = matchResult.metadata().iterator();
                    while (it.hasNext()) {
                        arrayList4.add(((MatchResult.Metadata) it.next()).resourceId());
                    }
                }
            }
            FileSystems.delete(arrayList4, new MoveOptions[]{MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES});
            FileSystems.delete(Collections.singletonList(resourceId), new MoveOptions[]{MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES});
        }

        static ResourceId writeMetadataFile(ResourceId resourceId, BucketMetadata bucketMetadata) {
            SortedBucketSink.LOG.info("Writing metadata to file {}", resourceId);
            try {
                OutputStream newOutputStream = Channels.newOutputStream(FileSystems.create(resourceId, "application/json"));
                try {
                    BucketMetadata.to(bucketMetadata, newOutputStream);
                    if (newOutputStream != null) {
                        newOutputStream.close();
                    }
                    return resourceId;
                } finally {
                }
            } catch (IOException e) {
                SortedBucketSink.cleanupTempFiles(e, Collections.singleton(resourceId));
                throw new RuntimeException("Failed to write metadata file", e);
            }
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.delegate(this.fileAssignment);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSink$SortBytesDoFn.class */
    public static class SortBytesDoFn<K> extends DoFn<KV<K, Iterable<KV<byte[], byte[]>>>, KV<K, Iterable<KV<byte[], byte[]>>>> {
        private final BufferedExternalSorter.Options sorterOptions;
        private final Counter bucketsInitiatedSorting;
        private final Counter bucketsCompletedSorting;

        SortBytesDoFn(String str, BufferedExternalSorter.Options options) {
            this.sorterOptions = options;
            this.bucketsInitiatedSorting = Metrics.counter(SortedBucketSink.class, str + "-bucketsInitiatedSorting");
            this.bucketsCompletedSorting = Metrics.counter(SortedBucketSink.class, str + "-bucketsCompletedSorting");
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<K, Iterable<KV<byte[], byte[]>>>, KV<K, Iterable<KV<byte[], byte[]>>>>.ProcessContext processContext) {
            KV kv = (KV) processContext.element();
            BufferedExternalSorter create = BufferedExternalSorter.create(this.sorterOptions);
            try {
                this.bucketsInitiatedSorting.inc();
                Iterator it = ((Iterable) kv.getValue()).iterator();
                while (it.hasNext()) {
                    create.add((KV) it.next());
                }
                processContext.output(KV.of(kv.getKey(), create.sort()));
                this.bucketsCompletedSorting.inc();
            } catch (IOException e) {
                throw new RuntimeException("Exception sorting buckets", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSink$SortedBucketPreKeyedSink.class */
    public static class SortedBucketPreKeyedSink<K, V> extends PTransform<PCollection<KV<K, V>>, WriteResult> {
        private final BucketMetadata<K, V> bucketMetadata;
        private final SMBFilenamePolicy filenamePolicy;
        private final ResourceId tempDirectory;
        private final FileOperations<V> fileOperations;
        private final int sorterMemoryMb;
        private final int keyCacheSize;
        private final Coder<V> valueCoder;
        private final boolean verifyKeyExtraction;

        public SortedBucketPreKeyedSink(BucketMetadata<K, V> bucketMetadata, ResourceId resourceId, ResourceId resourceId2, String str, FileOperations<V> fileOperations, int i, Coder<V> coder) {
            this(bucketMetadata, resourceId, resourceId2, str, fileOperations, i, coder, true);
        }

        public SortedBucketPreKeyedSink(BucketMetadata<K, V> bucketMetadata, ResourceId resourceId, ResourceId resourceId2, String str, FileOperations<V> fileOperations, int i, Coder<V> coder, boolean z) {
            this(bucketMetadata, resourceId, resourceId2, str, fileOperations, i, coder, z, 0);
        }

        public SortedBucketPreKeyedSink(BucketMetadata<K, V> bucketMetadata, ResourceId resourceId, ResourceId resourceId2, String str, FileOperations<V> fileOperations, int i, Coder<V> coder, boolean z, int i2) {
            this.bucketMetadata = bucketMetadata;
            this.filenamePolicy = new SMBFilenamePolicy(resourceId, bucketMetadata.getFilenamePrefix(), str);
            this.tempDirectory = resourceId2;
            this.fileOperations = fileOperations;
            this.sorterMemoryMb = i;
            this.keyCacheSize = i2;
            this.verifyKeyExtraction = z;
            this.valueCoder = coder;
        }

        public final WriteResult expand(PCollection<KV<K, V>> pCollection) {
            Preconditions.checkArgument(pCollection.isBounded() == PCollection.IsBounded.BOUNDED, "SortedBucketSink cannot be applied to a non-bounded PCollection");
            PCollection apply = pCollection.apply("ExtractKeys", ParDo.of(ExtractKeys.of(this.bucketMetadata, (v0) -> {
                return v0.getKey();
            }, DelegateCoder.of(this.valueCoder, (v0) -> {
                return v0.getValue();
            }, (DelegateCoder.CodingFunction) null), this.keyCacheSize)));
            if (this.verifyKeyExtraction) {
                pCollection.apply("VerifySample", Sample.any(100L)).apply("VerifyKeyFn", ParDo.of(new DoFn<KV<K, V>, Void>() { // from class: org.apache.beam.sdk.extensions.smb.SortedBucketSink.SortedBucketPreKeyedSink.1
                    /* JADX WARN: Multi-variable type inference failed */
                    @DoFn.ProcessElement
                    public void processElement(DoFn<KV<K, V>, Void>.ProcessContext processContext) throws Exception {
                        Object extractKey = SortedBucketPreKeyedSink.this.bucketMetadata.extractKey(((KV) processContext.element()).getValue());
                        NullableCoder of = NullableCoder.of(SortedBucketPreKeyedSink.this.bucketMetadata.getKeyCoder());
                        if (!Arrays.equals(CoderUtils.encodeToByteArray(of, extractKey), CoderUtils.encodeToByteArray(of, ((KV) processContext.element()).getKey()))) {
                            throw new RuntimeException("BucketMetadata's extractKey fn did not match pre-keyed PCollection");
                        }
                    }
                }));
            }
            return SortedBucketSink.sink(apply, getName(), this.valueCoder, this.sorterMemoryMb, this.filenamePolicy, this.fileOperations, this.bucketMetadata, this.tempDirectory);
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1249358039:
                    if (implMethodName.equals("getKey")) {
                        z = false;
                        break;
                    }
                    break;
                case 1967798203:
                    if (implMethodName.equals("getValue")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case BucketMetadata.CURRENT_VERSION /* 0 */:
                    if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/values/KV") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                        return (v0) -> {
                            return v0.getKey();
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/coders/DelegateCoder$CodingFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/values/KV") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                        return (v0) -> {
                            return v0.getValue();
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSink$WriteOperation.class */
    public static class WriteOperation<V> extends PTransform<PCollection<KV<BucketShardId, Iterable<KV<byte[], byte[]>>>>, WriteResult> {
        private final SMBFilenamePolicy filenamePolicy;
        private final BucketMetadata<?, V> bucketMetadata;
        private final FileOperations<V> fileOperations;
        private final ResourceId tempDirectory;
        private final Coder<V> valueCoder;

        WriteOperation(SMBFilenamePolicy sMBFilenamePolicy, BucketMetadata<?, V> bucketMetadata, FileOperations<V> fileOperations, ResourceId resourceId, Coder<V> coder) {
            this.filenamePolicy = sMBFilenamePolicy;
            this.bucketMetadata = bucketMetadata;
            this.fileOperations = fileOperations;
            this.tempDirectory = resourceId;
            this.valueCoder = coder;
        }

        public WriteResult expand(PCollection<KV<BucketShardId, Iterable<KV<byte[], byte[]>>>> pCollection) {
            return WriteResult.fromTuple(pCollection.apply("WriteTempFiles", ParDo.of(new WriteTempFiles(this.filenamePolicy.forTempFiles(this.tempDirectory), this.bucketMetadata, this.fileOperations, this.valueCoder))).apply("FinalizeTempFiles", new RenameBuckets(this.filenamePolicy.forTempFiles(this.tempDirectory).getDirectory(), this.filenamePolicy.forDestination(), this.bucketMetadata, this.fileOperations)));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSink$WriteResult.class */
    public static class WriteResult implements POutput {
        private final Pipeline pipeline;
        private final PCollection<ResourceId> writtenMetadata;
        private final PCollection<KV<BucketShardId, ResourceId>> writtenFiles;

        WriteResult(Pipeline pipeline, PCollection<ResourceId> pCollection, PCollection<KV<BucketShardId, ResourceId>> pCollection2) {
            this.pipeline = pipeline;
            this.writtenMetadata = pCollection;
            this.writtenFiles = pCollection2;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static WriteResult fromTuple(PCollectionTuple pCollectionTuple) {
            return new WriteResult(pCollectionTuple.getPipeline(), pCollectionTuple.get(new TupleTag("writtenMetadata")).setCoder(ResourceIdCoder.of()), pCollectionTuple.get(new TupleTag("writtenBuckets")).setCoder(KvCoder.of(BucketShardId.BucketShardIdCoder.of(), ResourceIdCoder.of())));
        }

        public Pipeline getPipeline() {
            return this.pipeline;
        }

        public Map<TupleTag<?>, PValue> expand() {
            return ImmutableMap.of(new TupleTag("WrittenMetadata"), this.writtenMetadata, new TupleTag("WrittenFiles"), this.writtenFiles);
        }

        public void finishSpecifyingOutput(String str, PInput pInput, PTransform<?, ?> pTransform) {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/smb/SortedBucketSink$WriteTempFiles.class */
    public static class WriteTempFiles<V> extends DoFn<KV<BucketShardId, Iterable<KV<byte[], byte[]>>>, KV<BucketShardId, ResourceId>> {
        private final SMBFilenamePolicy.FileAssignment fileAssignment;
        private final BucketMetadata bucketMetadata;
        private final FileOperations<V> fileOperations;
        private final Coder<V> valueCoder;

        WriteTempFiles(SMBFilenamePolicy.FileAssignment fileAssignment, BucketMetadata bucketMetadata, FileOperations<V> fileOperations, Coder<V> coder) {
            this.fileAssignment = fileAssignment;
            this.bucketMetadata = bucketMetadata;
            this.fileOperations = fileOperations;
            this.valueCoder = coder;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<BucketShardId, Iterable<KV<byte[], byte[]>>>, KV<BucketShardId, ResourceId>>.ProcessContext processContext) throws IOException {
            BucketShardId bucketShardId = (BucketShardId) ((KV) processContext.element()).getKey();
            Iterable iterable = (Iterable) ((KV) processContext.element()).getValue();
            ResourceId forBucket = this.fileAssignment.forBucket(bucketShardId, this.bucketMetadata);
            SortedBucketSink.LOG.info("Writing sorted-bucket {} to temporary file {}", bucketShardId, forBucket);
            FileOperations.Writer<V> createWriter = this.fileOperations.createWriter(forBucket);
            try {
                iterable.forEach(kv -> {
                    try {
                        createWriter.write(CoderUtils.decodeFromByteArray(this.valueCoder, (byte[]) kv.getValue()));
                    } catch (IOException e) {
                        SortedBucketSink.cleanupTempFiles(e, Collections.singleton(forBucket));
                        throw new RuntimeException("Failed to write sorted-bucket file " + bucketShardId, e);
                    }
                });
                if (createWriter != null) {
                    createWriter.close();
                }
                processContext.output(KV.of(bucketShardId, forBucket));
            } catch (Throwable th) {
                if (createWriter != null) {
                    try {
                        createWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.delegate(this.fileAssignment);
            builder.delegate(this.fileOperations);
        }
    }

    public SortedBucketSink(BucketMetadata<K, V> bucketMetadata, ResourceId resourceId, ResourceId resourceId2, String str, FileOperations<V> fileOperations, int i) {
        this(bucketMetadata, resourceId, resourceId2, str, fileOperations, i, 0);
    }

    public SortedBucketSink(BucketMetadata<K, V> bucketMetadata, ResourceId resourceId, ResourceId resourceId2, String str, FileOperations<V> fileOperations, int i, int i2) {
        this.bucketMetadata = bucketMetadata;
        this.filenamePolicy = new SMBFilenamePolicy(resourceId, bucketMetadata.getFilenamePrefix(), str);
        this.tempDirectory = resourceId2;
        this.fileOperations = fileOperations;
        this.sorterMemoryMb = i;
        this.keyCacheSize = i2;
    }

    public final WriteResult expand(PCollection<V> pCollection) {
        Preconditions.checkArgument(pCollection.isBounded() == PCollection.IsBounded.BOUNDED, "SortedBucketSink cannot be applied to a non-bounded PCollection");
        Coder coder = pCollection.getCoder();
        BucketMetadata<K, V> bucketMetadata = this.bucketMetadata;
        BucketMetadata<K, V> bucketMetadata2 = this.bucketMetadata;
        Objects.requireNonNull(bucketMetadata2);
        return sink(pCollection.apply("ExtractKeys", ParDo.of(ExtractKeys.of(bucketMetadata, bucketMetadata2::extractKey, coder, this.keyCacheSize))), getName(), coder, this.sorterMemoryMb, this.filenamePolicy, this.fileOperations, this.bucketMetadata, this.tempDirectory);
    }

    public static <KeyT, ValueT> WriteResult sink(PCollection<KV<BucketShardId, KV<byte[], byte[]>>> pCollection, String str, Coder<ValueT> coder, int i, SMBFilenamePolicy sMBFilenamePolicy, FileOperations<ValueT> fileOperations, BucketMetadata<KeyT, ValueT> bucketMetadata, ResourceId resourceId) {
        return (WriteResult) pCollection.setCoder(KvCoder.of(BucketShardId.BucketShardIdCoder.of(), KvCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of()))).apply("GroupByKey", GroupByKey.create()).apply("SortValues", ParDo.of(new SortBytesDoFn(str, BufferedExternalSorter.options().withExternalSorterType(ExternalSorter.Options.SorterType.NATIVE).withMemoryMB(i)))).apply("WriteOperation", new WriteOperation(sMBFilenamePolicy, bucketMetadata, fileOperations, resourceId, coder));
    }

    static void cleanupTempFiles(Exception exc, Collection<ResourceId> collection) {
        LOG.info("Deleting temporary file {}", collection.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(", ")));
        try {
            FileSystems.delete(collection, new MoveOptions[]{MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES});
        } catch (IOException e) {
            exc.addSuppressed(e);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 751529502:
                if (implMethodName.equals("extractKey")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case BucketMetadata.CURRENT_VERSION /* 0 */:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/extensions/smb/BucketMetadata") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    BucketMetadata bucketMetadata = (BucketMetadata) serializedLambda.getCapturedArg(0);
                    return bucketMetadata::extractKey;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
