package co.cask.cdap.examples.streamconversion;

import co.cask.cdap.api.Resources;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.stream.StreamBatchReadable;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSet;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSetArguments;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/examples/streamconversion/StreamConversionMapReduce.class */
public class StreamConversionMapReduce extends AbstractMapReduce {
    private static final Logger LOG = LoggerFactory.getLogger(StreamConversionMapReduce.class);
    private static final Schema SCHEMA = new Schema.Parser().parse(StreamConversionApp.SCHEMA_STRING);
    private final Map<String, String> dsArguments = Maps.newHashMap();

    /* loaded from: input_file:co/cask/cdap/examples/streamconversion/StreamConversionMapReduce$StreamConversionMapper.class */
    public static class StreamConversionMapper extends Mapper<LongWritable, StreamEvent, AvroKey<GenericRecord>, NullWritable> {
        public void map(LongWritable longWritable, StreamEvent streamEvent, Mapper<LongWritable, StreamEvent, AvroKey<GenericRecord>, NullWritable>.Context context) throws IOException, InterruptedException {
            context.write(new AvroKey(new GenericRecordBuilder(StreamConversionMapReduce.SCHEMA).set("time", Long.valueOf(streamEvent.getTimestamp())).set("body", Bytes.toString((ByteBuffer) streamEvent.getBody())).build()), NullWritable.get());
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((LongWritable) obj, (StreamEvent) obj2, (Mapper<LongWritable, StreamEvent, AvroKey<GenericRecord>, NullWritable>.Context) context);
        }
    }

    public void configure() {
        setDescription("Job to read a chunk of stream events and write them to a FileSet");
        setMapperResources(new Resources(512));
    }

    public void beforeSubmit(MapReduceContext mapReduceContext) throws Exception {
        Job job = (Job) mapReduceContext.getHadoopJob();
        job.setMapperClass(StreamConversionMapper.class);
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(AvroKey.class);
        job.setMapOutputValueClass(NullWritable.class);
        AvroJob.setOutputKeySchema(job, SCHEMA);
        long logicalStartTime = mapReduceContext.getLogicalStartTime();
        StreamBatchReadable.useStreamInput(mapReduceContext, "events", logicalStartTime - TimeUnit.MINUTES.toMillis(5L), logicalStartTime);
        TimePartitionedFileSetArguments.setOutputPartitionTime(this.dsArguments, logicalStartTime);
        TimePartitionedFileSet dataset = mapReduceContext.getDataset("converted", this.dsArguments);
        mapReduceContext.addOutput("converted", dataset);
        LOG.info("Output location for new partition is: {}", dataset.getEmbeddedFileSet().getOutputLocation());
    }
}
