/*
 * Decompiled with CFR 0.152.
 */
package com.spotify.scio.bigtable;

import com.google.bigtable.v2.Mutation;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableService;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableServiceHelper;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigtableBulkWriter
extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> {
    private static final Logger LOG = LoggerFactory.getLogger(BigtableBulkWriter.class);
    private final BigtableOptions bigtableOptions;
    private final String tableName;
    private final int numOfShards;
    private final Duration flushInterval;

    public BigtableBulkWriter(String string, BigtableOptions bigtableOptions, int n, Duration duration) {
        this.bigtableOptions = bigtableOptions;
        this.tableName = string;
        this.numOfShards = n;
        this.flushInterval = duration;
    }

    public PDone expand(PCollection<KV<ByteString, Iterable<Mutation>>> pCollection) {
        BigtableBulkWriter.createBulkShards(pCollection, this.numOfShards, this.flushInterval).apply("Bigtable BulkWrite", (PTransform)ParDo.of((DoFn)new BigtableBulkWriterFn()));
        return PDone.in((Pipeline)pCollection.getPipeline());
    }

    @VisibleForTesting
    static PCollection<Iterable<KV<ByteString, Iterable<Mutation>>>> createBulkShards(PCollection<KV<ByteString, Iterable<Mutation>>> pCollection, int n, Duration duration) {
        return (PCollection)((PCollection)((PCollection)((PCollection)pCollection.apply("Assign To Shard", (PTransform)ParDo.of((DoFn)new AssignToShard(n)))).apply("Window", (PTransform)Window.into((WindowFn)new GlobalWindows()).triggering((Trigger)Repeatedly.forever((Trigger)AfterProcessingTime.pastFirstElementInPane().plusDelayOf(duration))).discardingFiredPanes().withAllowedLateness(Duration.ZERO))).apply("Group By Shard", (PTransform)GroupByKey.create())).apply("Gets Mutations", (PTransform)ParDo.of((DoFn)new DoFn<KV<Long, Iterable<KV<ByteString, Iterable<Mutation>>>>, Iterable<KV<ByteString, Iterable<Mutation>>>>(){

            @DoFn.ProcessElement
            public void process(DoFn.ProcessContext processContext) {
                processContext.output((Object)((Iterable)((KV)processContext.element()).getValue()));
            }
        }));
    }

    static class AssignToShard
    extends DoFn<KV<ByteString, Iterable<Mutation>>, KV<Long, KV<ByteString, Iterable<Mutation>>>> {
        private final int numOfShards;

        AssignToShard(int n) {
            this.numOfShards = n;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext processContext) {
            long l = ThreadLocalRandom.current().nextLong(this.numOfShards);
            processContext.output((Object)KV.of((Object)l, (Object)((KV)processContext.element())));
        }
    }

    private class BigtableBulkWriterFn
    extends DoFn<Iterable<KV<ByteString, Iterable<Mutation>>>, Void> {
        private BigtableService.Writer bigtableWriter;
        private long recordsWritten;
        private final ConcurrentLinkedQueue<BigtableWriteException> failures = new ConcurrentLinkedQueue();

        @DoFn.StartBundle
        public void startBundle(DoFn.StartBundleContext startBundleContext) throws IOException {
            if (this.bigtableWriter == null) {
                this.bigtableWriter = new BigtableServiceHelper(BigtableBulkWriter.this.bigtableOptions).openForWriting(BigtableBulkWriter.this.tableName);
            }
            this.recordsWritten = 0L;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext processContext) throws Exception {
            this.checkForFailures(this.failures);
            for (KV kV : (Iterable)processContext.element()) {
                this.bigtableWriter.writeRecord(kV).whenComplete((mutateRowResponse, throwable) -> {
                    if (throwable != null) {
                        this.failures.add(new BigtableWriteException((KV<ByteString, Iterable<Mutation>>)kV, (Throwable)throwable));
                    }
                });
                ++this.recordsWritten;
            }
        }

        @DoFn.FinishBundle
        public void finishBundle() throws Exception {
            this.bigtableWriter.flush();
            this.checkForFailures(this.failures);
            LOG.debug("Wrote {} records", (Object)this.recordsWritten);
        }

        @DoFn.Teardown
        public void tearDown() throws Exception {
            if (this.bigtableWriter != null) {
                this.bigtableWriter.close();
                this.bigtableWriter = null;
            }
        }

        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item((String)"Records Written", (Long)this.recordsWritten));
        }

        private void checkForFailures(ConcurrentLinkedQueue<BigtableWriteException> concurrentLinkedQueue) throws IOException {
            Object object;
            int n;
            if (concurrentLinkedQueue.isEmpty()) {
                return;
            }
            StringBuilder stringBuilder = new StringBuilder();
            ArrayList<BigtableWriteException> arrayList = new ArrayList<BigtableWriteException>();
            for (n = 0; n < 10 && !concurrentLinkedQueue.isEmpty(); ++n) {
                object = (BigtableWriteException)concurrentLinkedQueue.remove();
                stringBuilder.append("\n").append(((Throwable)object).getMessage());
                if (((Throwable)object).getCause() != null) {
                    stringBuilder.append(": ").append(((Throwable)object).getCause().getMessage());
                }
                arrayList.add((BigtableWriteException)object);
            }
            object = String.format("At least %d errors occurred writing to Bigtable. First %d errors: %s", n + concurrentLinkedQueue.size(), n, stringBuilder.toString());
            LOG.error((String)object);
            IOException iOException = new IOException((String)object);
            for (BigtableWriteException bigtableWriteException : arrayList) {
                iOException.addSuppressed(bigtableWriteException);
            }
            throw iOException;
        }

        class BigtableWriteException
        extends IOException {
            public BigtableWriteException(KV<ByteString, Iterable<Mutation>> kV, Throwable throwable) {
                super(String.format("Error mutating row %s with mutations %s", ((ByteString)kV.getKey()).toStringUtf8(), kV.getValue()), throwable);
            }
        }
    }
}

