package com.google.cloud.pubsublite.spark;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.spark.internal.PublisherFactory;
import com.google.common.flogger.GoogleLogger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import javax.annotation.concurrent.GuardedBy;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;

/* loaded from: input_file:com/google/cloud/pubsublite/spark/PslDataWriter.class */
public class PslDataWriter implements DataWriter<InternalRow> {
    private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
    private final long partitionId;
    private final long taskId;
    private final long epochId;
    private final StructType inputSchema;
    private final PublisherFactory publisherFactory;

    @GuardedBy("this")
    private Optional<Publisher<MessageMetadata>> publisher = Optional.empty();

    @GuardedBy("this")
    private final List<ApiFuture<MessageMetadata>> futures = new ArrayList();

    public PslDataWriter(long j, long j2, long j3, StructType structType, PublisherFactory publisherFactory) {
        this.partitionId = j;
        this.taskId = j2;
        this.epochId = j3;
        this.inputSchema = structType;
        this.publisherFactory = publisherFactory;
    }

    public synchronized void write(InternalRow internalRow) {
        if (!this.publisher.isPresent() || this.publisher.get().state() != ApiService.State.RUNNING) {
            this.publisher = Optional.of(this.publisherFactory.newPublisher());
        }
        this.futures.add(this.publisher.get().publish((Message) Objects.requireNonNull(PslSparkUtils.toPubSubMessage(this.inputSchema, internalRow))));
    }

    public synchronized WriterCommitMessage commit() throws IOException {
        Iterator<ApiFuture<MessageMetadata>> it = this.futures.iterator();
        while (it.hasNext()) {
            try {
                it.next().get();
            } catch (InterruptedException | ExecutionException e) {
                this.publisher = Optional.empty();
                throw new IOException(e);
            }
        }
        log.atInfo().log("All writes for partitionId:%d, taskId:%d, epochId:%d succeeded, committing...", Long.valueOf(this.partitionId), Long.valueOf(this.taskId), Long.valueOf(this.epochId));
        return PslWriterCommitMessage.create(this.futures.size());
    }

    public synchronized void abort() {
        log.atWarning().log("One or more writes for partitionId:%d, taskId:%d, epochId:%d failed, aborted.", Long.valueOf(this.partitionId), Long.valueOf(this.taskId), Long.valueOf(this.epochId));
    }

    public void close() throws IOException {
    }
}
