/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.flume.source;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.flume.FlumeConfig;
import org.apache.pulsar.io.flume.FlumeConnector;
import org.apache.pulsar.io.flume.source.SinkOfFlume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractSource<V>
extends PushSource<V> {
    private static final Logger log = LoggerFactory.getLogger(AbstractSource.class);
    protected Thread thread = null;
    protected volatile boolean running = false;
    protected final Thread.UncaughtExceptionHandler handler = (t, e) -> log.error("[{}] parse events has an error", (Object)t.getName(), (Object)e);

    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        FlumeConfig flumeConfig = FlumeConfig.load(config);
        FlumeConnector flumeConnector = new FlumeConnector();
        flumeConnector.StartConnector(flumeConfig);
        this.start();
    }

    public abstract V extractValue(String var1);

    protected void start() {
        this.thread = new Thread(this::process);
        this.thread.setName("flume source thread");
        this.thread.setUncaughtExceptionHandler(this.handler);
        this.running = true;
        this.thread.start();
    }

    public void close() throws InterruptedException {
        log.info("close flume source");
        if (!this.running) {
            return;
        }
        this.running = false;
        if (this.thread != null) {
            this.thread.interrupt();
            this.thread.join();
        }
    }

    protected void process() {
        while (this.running) {
            try {
                log.info("start flume receive from sink process");
                while (this.running) {
                    BlockingQueue blockingQueue = SinkOfFlume.getQueue();
                    while (blockingQueue != null && !blockingQueue.isEmpty()) {
                        ByteArrayOutputStream bos = new ByteArrayOutputStream();
                        ObjectOutputStream out = null;
                        out = new ObjectOutputStream(bos);
                        Map message = (Map)blockingQueue.take();
                        out.writeObject(message.get("body"));
                        out.flush();
                        byte[] m = bos.toByteArray();
                        String m1 = new String(m);
                        bos.close();
                        FlumeRecord<V> flumeRecord = new FlumeRecord<V>();
                        flumeRecord.setRecord(this.extractValue(m1));
                        this.consume(flumeRecord);
                    }
                }
            }
            catch (Exception e) {
                log.error("process error!", (Throwable)e);
            }
        }
    }

    private static class FlumeRecord<V>
    implements Record<V> {
        private V record;
        private Long id;

        private FlumeRecord() {
        }

        public Optional<String> getKey() {
            return Optional.of(Long.toString(this.id));
        }

        public V getValue() {
            return this.record;
        }

        public V getRecord() {
            return this.record;
        }

        public Long getId() {
            return this.id;
        }

        public void setRecord(V record) {
            this.record = record;
        }

        public void setId(Long id) {
            this.id = id;
        }
    }
}

