package co.cask.cdap.examples.wikipedia;

import co.cask.cdap.api.Resources;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.data.stream.StreamBatchReadable;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.mapreduce.AbstractMapReduce;
import co.cask.cdap.api.mapreduce.MapReduceContext;
import co.cask.cdap.api.workflow.Value;
import co.cask.cdap.api.workflow.WorkflowToken;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
import com.google.inject.internal.asm.C$Opcodes;
import java.io.IOException;
import java.util.UUID;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/examples/wikipedia/StreamToDataset.class */
public class StreamToDataset extends AbstractMapReduce {
    private static final Logger LOG = LoggerFactory.getLogger(StreamToDataset.class);
    private final String name;

    /* loaded from: input_file:co/cask/cdap/examples/wikipedia/StreamToDataset$PageTitleToDatasetMapper.class */
    public static final class PageTitleToDatasetMapper extends Mapper<LongWritable, StreamEvent, byte[], byte[]> {
        private final Gson gson = new Gson();

        /* JADX INFO: Access modifiers changed from: package-private */
        @VisibleForTesting
        /* loaded from: input_file:co/cask/cdap/examples/wikipedia/StreamToDataset$PageTitleToDatasetMapper$Page.class */
        public static class Page {
            private final String name;
            private final String id;

            @SerializedName("created_time")
            private final String createdTime;

            Page(String str, String str2, String str3) {
                this.name = str;
                this.id = str2;
                this.createdTime = str3;
            }

            public String getName() {
                return this.name;
            }

            public String getId() {
                return this.id;
            }
        }

        protected void map(LongWritable longWritable, StreamEvent streamEvent, Mapper<LongWritable, StreamEvent, byte[], byte[]>.Context context) throws IOException, InterruptedException {
            Page page = (Page) this.gson.fromJson(Bytes.toString(streamEvent.getBody()), Page.class);
            context.write(Bytes.toBytes(page.getId()), Bytes.toBytes(page.getName()));
            context.getCounter("custom", "num.records").increment(1L);
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((LongWritable) obj, (StreamEvent) obj2, (Mapper<LongWritable, StreamEvent, byte[], byte[]>.Context) context);
        }
    }

    /* loaded from: input_file:co/cask/cdap/examples/wikipedia/StreamToDataset$RawWikiDataToDatasetMapper.class */
    public static final class RawWikiDataToDatasetMapper extends Mapper<LongWritable, StreamEvent, byte[], byte[]> {
        protected void map(LongWritable longWritable, StreamEvent streamEvent, Mapper<LongWritable, StreamEvent, byte[], byte[]>.Context context) throws IOException, InterruptedException {
            context.write(Bytes.toBytes(UUID.randomUUID()), Bytes.toBytes(Bytes.toString(streamEvent.getBody())));
            context.getCounter("custom", "num.records").increment(1L);
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((LongWritable) obj, (StreamEvent) obj2, (Mapper<LongWritable, StreamEvent, byte[], byte[]>.Context) context);
        }
    }

    public StreamToDataset(String str) {
        this.name = str;
    }

    @Override // co.cask.cdap.api.mapreduce.AbstractMapReduce
    public void configure() {
        setName(this.name);
        setDescription("A MapReduce program that dumps events from a stream to a dataset.");
        setMapperResources(new Resources(C$Opcodes.ACC_INTERFACE));
    }

    @Override // co.cask.cdap.api.mapreduce.AbstractMapReduce, co.cask.cdap.api.mapreduce.MapReduce
    public void beforeSubmit(MapReduceContext mapReduceContext) throws Exception {
        Value value;
        Job job = (Job) mapReduceContext.getHadoopJob();
        job.setNumReduceTasks(0);
        WorkflowToken workflowToken = mapReduceContext.getWorkflowToken();
        Class cls = PageTitleToDatasetMapper.class;
        String str = "pageTitleStream";
        String str2 = "pages";
        if (workflowToken != null && (value = workflowToken.get("result", "LikesToDataset")) != null && value.getAsBoolean()) {
            cls = RawWikiDataToDatasetMapper.class;
            str = "wikiStream";
            str2 = "wikidata";
        }
        LOG.info("Using '{}' as the input stream and '{}' as the output dataset.", str, str2);
        job.setMapperClass(cls);
        StreamBatchReadable.useStreamInput(mapReduceContext, str);
        mapReduceContext.addOutput(str2);
    }

    @Override // co.cask.cdap.api.mapreduce.AbstractMapReduce, co.cask.cdap.api.mapreduce.MapReduce
    public void onFinish(boolean z, MapReduceContext mapReduceContext) throws Exception {
        WorkflowToken workflowToken = mapReduceContext.getWorkflowToken();
        if (workflowToken != null) {
            workflowToken.put("result", Value.of(z));
        }
    }
}
