package io.cdap.mmds.manager;

import com.google.common.collect.ImmutableList;
import io.cdap.cdap.api.ServiceDiscoverer;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.plugin.PluginContext;
import io.cdap.cdap.api.spark.sql.DataFrames;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.InvalidEntry;
import io.cdap.cdap.etl.api.Transform;
import io.cdap.cdap.etl.api.TransformContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;

/* loaded from: input_file:io/cdap/mmds/manager/WranglerFunction.class */
public class WranglerFunction implements FlatMapFunction<Row, Row> {
    private static final Schema TEXT_SCHEMA = Schema.recordOf("textRecord", new Schema.Field[]{Schema.Field.of("body", Schema.nullableOf(Schema.of(Schema.Type.STRING)))});
    private final PluginContext pluginContext;
    private final Schema schema;
    private final ServiceDiscoverer serviceDiscoverer;
    private transient TranslatingEmitter emitter;
    private transient Transform<StructuredRecord, StructuredRecord> wrangler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/cdap/mmds/manager/WranglerFunction$TranslatingEmitter.class */
    public static class TranslatingEmitter implements Emitter<StructuredRecord> {
        private final List<Row> outputRecords;
        private final StructType sparkSchema;

        private TranslatingEmitter(StructType structType) {
            this.sparkSchema = structType;
            this.outputRecords = new ArrayList();
        }

        @Override // io.cdap.cdap.etl.api.Emitter
        public void emit(StructuredRecord structuredRecord) {
            this.outputRecords.add(DataFrames.toRow(structuredRecord, this.sparkSchema));
        }

        @Override // io.cdap.cdap.etl.api.AlertEmitter
        public void emitAlert(Map<String, String> map) {
        }

        @Override // io.cdap.cdap.etl.api.ErrorEmitter
        public void emitError(InvalidEntry<StructuredRecord> invalidEntry) {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void reset() {
            this.outputRecords.clear();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<Row> getRecords() {
            return ImmutableList.copyOf((Collection) this.outputRecords);
        }
    }

    public WranglerFunction(Schema schema, PluginContext pluginContext, ServiceDiscoverer serviceDiscoverer) {
        this.schema = schema;
        this.pluginContext = pluginContext;
        this.serviceDiscoverer = serviceDiscoverer;
    }

    public Iterator<Row> call(Row row) throws Exception {
        if (this.wrangler == null) {
            WranglerContext wranglerContext = new WranglerContext(this.pluginContext, this.serviceDiscoverer);
            this.wrangler = (Transform) this.pluginContext.newPluginInstance("wrangler");
            this.wrangler.initialize((TransformContext) wranglerContext);
            this.emitter = new TranslatingEmitter(DataFrames.toDataType(this.schema));
        }
        StructuredRecord build = StructuredRecord.builder(TEXT_SCHEMA).set("body", row.get(0)).build();
        this.emitter.reset();
        this.wrangler.transform(build, this.emitter);
        return this.emitter.getRecords().iterator();
    }
}
