package com.google.cloud.pubsublite.spark;

import com.google.common.flogger.GoogleLogger;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.types.StructType;

/* loaded from: input_file:com/google/cloud/pubsublite/spark/PslStreamWriter.class */
public class PslStreamWriter implements StreamWriter {
    private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
    private final StructType inputSchema;
    private final PslWriteDataSourceOptions writeOptions;

    public PslStreamWriter(StructType structType, PslWriteDataSourceOptions pslWriteDataSourceOptions) {
        this.inputSchema = structType;
        this.writeOptions = pslWriteDataSourceOptions;
    }

    public void commit(long j, WriterCommitMessage[] writerCommitMessageArr) {
        log.atInfo().log("Committed %d messages for epochId:%d.", countMessages(writerCommitMessageArr), j);
    }

    public void abort(long j, WriterCommitMessage[] writerCommitMessageArr) {
        log.atWarning().log("Epoch id: %d is aborted, %d messages might have been published.", j, countMessages(writerCommitMessageArr));
    }

    private long countMessages(WriterCommitMessage[] writerCommitMessageArr) {
        long j = 0;
        for (WriterCommitMessage writerCommitMessage : writerCommitMessageArr) {
            if (writerCommitMessage instanceof PslWriterCommitMessage) {
                j += ((PslWriterCommitMessage) writerCommitMessage).numMessages();
            }
        }
        return j;
    }

    public DataWriterFactory<InternalRow> createWriterFactory() {
        return new PslDataWriterFactory(this.inputSchema, this.writeOptions);
    }
}
