package com.spotify.scio.bigtable;

import com.google.bigtable.v2.Mutation;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableService;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableServiceHelper;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/spotify/scio/bigtable/BigtableBulkWriter.class */
public class BigtableBulkWriter extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> {
    private static final Logger LOG = LoggerFactory.getLogger(BigtableBulkWriter.class);
    private final BigtableOptions bigtableOptions;
    private final String tableName;
    private final int numOfShards;
    private final Duration flushInterval;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/spotify/scio/bigtable/BigtableBulkWriter$AssignToShard.class */
    public static class AssignToShard extends DoFn<KV<ByteString, Iterable<Mutation>>, KV<Long, KV<ByteString, Iterable<Mutation>>>> {
        private final int numOfShards;

        AssignToShard(int i) {
            this.numOfShards = i;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<ByteString, Iterable<Mutation>>, KV<Long, KV<ByteString, Iterable<Mutation>>>>.ProcessContext processContext) {
            processContext.output(KV.of(Long.valueOf(ThreadLocalRandom.current().nextLong(this.numOfShards)), processContext.element()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/spotify/scio/bigtable/BigtableBulkWriter$BigtableBulkWriterFn.class */
    public class BigtableBulkWriterFn extends DoFn<Iterable<KV<ByteString, Iterable<Mutation>>>, Void> {
        private BigtableService.Writer bigtableWriter;
        private long recordsWritten;
        private final ConcurrentLinkedQueue<BigtableWriteException> failures = new ConcurrentLinkedQueue<>();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:com/spotify/scio/bigtable/BigtableBulkWriter$BigtableBulkWriterFn$BigtableWriteException.class */
        public class BigtableWriteException extends IOException {
            public BigtableWriteException(KV<ByteString, Iterable<Mutation>> kv, Throwable th) {
                super(String.format("Error mutating row %s with mutations %s", ((ByteString) kv.getKey()).toStringUtf8(), kv.getValue()), th);
            }
        }

        public BigtableBulkWriterFn() {
        }

        @DoFn.StartBundle
        public void startBundle(DoFn<Iterable<KV<ByteString, Iterable<Mutation>>>, Void>.StartBundleContext startBundleContext) throws IOException {
            if (this.bigtableWriter == null) {
                this.bigtableWriter = new BigtableServiceHelper(BigtableBulkWriter.this.bigtableOptions).openForWriting(BigtableBulkWriter.this.tableName);
            }
            this.recordsWritten = 0L;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Iterable<KV<ByteString, Iterable<Mutation>>>, Void>.ProcessContext processContext) throws Exception {
            checkForFailures(this.failures);
            for (KV kv : (Iterable) processContext.element()) {
                this.bigtableWriter.writeRecord(kv).whenComplete((mutateRowResponse, th) -> {
                    if (th != null) {
                        this.failures.add(new BigtableWriteException(kv, th));
                    }
                });
                this.recordsWritten++;
            }
        }

        @DoFn.FinishBundle
        public void finishBundle() throws Exception {
            this.bigtableWriter.flush();
            checkForFailures(this.failures);
            BigtableBulkWriter.LOG.debug("Wrote {} records", Long.valueOf(this.recordsWritten));
        }

        @DoFn.Teardown
        public void tearDown() throws Exception {
            if (this.bigtableWriter != null) {
                this.bigtableWriter.close();
                this.bigtableWriter = null;
            }
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("Records Written", Long.valueOf(this.recordsWritten)));
        }

        private void checkForFailures(ConcurrentLinkedQueue<BigtableWriteException> concurrentLinkedQueue) throws IOException {
            if (concurrentLinkedQueue.isEmpty()) {
                return;
            }
            StringBuilder sb = new StringBuilder();
            int i = 0;
            ArrayList arrayList = new ArrayList();
            while (i < 10 && !concurrentLinkedQueue.isEmpty()) {
                BigtableWriteException remove = concurrentLinkedQueue.remove();
                sb.append("\n").append(remove.getMessage());
                if (remove.getCause() != null) {
                    sb.append(": ").append(remove.getCause().getMessage());
                }
                arrayList.add(remove);
                i++;
            }
            String format = String.format("At least %d errors occurred writing to Bigtable. First %d errors: %s", Integer.valueOf(i + concurrentLinkedQueue.size()), Integer.valueOf(i), sb.toString());
            BigtableBulkWriter.LOG.error(format);
            IOException iOException = new IOException(format);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                iOException.addSuppressed((BigtableWriteException) it.next());
            }
            throw iOException;
        }
    }

    public BigtableBulkWriter(String str, BigtableOptions bigtableOptions, int i, Duration duration) {
        this.bigtableOptions = bigtableOptions;
        this.tableName = str;
        this.numOfShards = i;
        this.flushInterval = duration;
    }

    public PDone expand(PCollection<KV<ByteString, Iterable<Mutation>>> pCollection) {
        createBulkShards(pCollection, this.numOfShards, this.flushInterval).apply("Bigtable BulkWrite", ParDo.of(new BigtableBulkWriterFn()));
        return PDone.in(pCollection.getPipeline());
    }

    @VisibleForTesting
    static PCollection<Iterable<KV<ByteString, Iterable<Mutation>>>> createBulkShards(PCollection<KV<ByteString, Iterable<Mutation>>> pCollection, int i, Duration duration) {
        return pCollection.apply("Assign To Shard", ParDo.of(new AssignToShard(i))).apply("Window", Window.into(new GlobalWindows()).triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(duration))).discardingFiredPanes().withAllowedLateness(Duration.ZERO)).apply("Group By Shard", GroupByKey.create()).apply("Gets Mutations", ParDo.of(new DoFn<KV<Long, Iterable<KV<ByteString, Iterable<Mutation>>>>, Iterable<KV<ByteString, Iterable<Mutation>>>>() { // from class: com.spotify.scio.bigtable.BigtableBulkWriter.1
            @DoFn.ProcessElement
            public void process(DoFn<KV<Long, Iterable<KV<ByteString, Iterable<Mutation>>>>, Iterable<KV<ByteString, Iterable<Mutation>>>>.ProcessContext processContext) {
                processContext.output(((KV) processContext.element()).getValue());
            }
        }));
    }
}
