package it.netgrid.bauer.impl.impl;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.cbor.CBORFactory;
import com.fasterxml.jackson.dataformat.cbor.CBORParser;
import com.google.inject.Inject;
import it.netgrid.bauer.impl.StreamConfig;
import it.netgrid.bauer.impl.StreamManager;
import it.netgrid.bauer.impl.StreamMessageConsumer;
import it.netgrid.bauer.impl.StreamsProvider;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:it/netgrid/bauer/impl/impl/StreamThreadedManager.class */
public class StreamThreadedManager implements StreamManager {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) StreamThreadedManager.class);
    private final StreamsProvider streams;
    private final StreamConfig config;
    private Future<Integer> parser;
    private final CBORFactory cf = new CBORFactory();
    private final ObjectMapper om = new ObjectMapper(this.cf);
    private final ExecutorService executor = Executors.newFixedThreadPool(2);
    private final List<StreamMessageConsumer> consumers = new ArrayList();

    @Inject
    public StreamThreadedManager(StreamConfig streamConfig, StreamsProvider streamsProvider) {
        this.config = streamConfig;
        this.streams = streamsProvider;
    }

    public Integer runCborMessageFetch() {
        try {
            CBORParser createParser = this.cf.createParser(this.streams.input());
            while (this.streams.input().available() != -1) {
                if (createParser.nextToken() != null) {
                    try {
                        trigger((JsonNode) this.om.readTree(createParser));
                    } catch (JsonParseException e) {
                        log.warn(String.format("parsing failed: %s", e.getMessage()));
                    }
                }
            }
        } catch (IOException e2) {
            log.warn(String.format("read failed: %s", e2.getMessage()));
        }
        return 0;
    }

    @Override // it.netgrid.bauer.impl.StreamManager
    public synchronized void addMessageConsumer(StreamMessageConsumer streamMessageConsumer) {
        this.consumers.add(streamMessageConsumer);
        if (this.consumers.size() == 1) {
            start();
        }
    }

    public void unsafeAddMessageConsumer(StreamMessageConsumer streamMessageConsumer) {
        this.consumers.add(streamMessageConsumer);
    }

    @Override // it.netgrid.bauer.impl.StreamManager
    public void postMessage(JsonNode jsonNode) {
        if (jsonNode != null) {
            this.executor.submit(() -> {
                unsafePostMessage(jsonNode);
            });
        }
    }

    public synchronized void unsafePostMessage(JsonNode jsonNode) {
        try {
            this.streams.output().write(this.om.writeValueAsBytes(jsonNode));
            this.streams.output().flush();
        } catch (JsonProcessingException e) {
            log.warn(String.format("unable to process: %s", e.getMessage()));
        } catch (IOException e2) {
            log.warn(String.format("unable to post: %s", e2.getMessage()));
        }
    }

    public synchronized void start() {
        if (this.parser == null || this.parser.isDone()) {
            this.parser = this.executor.submit(() -> {
                return runCborMessageFetch();
            });
        }
    }

    public synchronized void trigger(JsonNode jsonNode) {
        if (jsonNode != null) {
            Iterator<StreamMessageConsumer> it2 = this.consumers.iterator();
            while (it2.hasNext()) {
                it2.next().consume(jsonNode);
            }
            if (this.config.isMessageBubblingEnabled().booleanValue()) {
                postMessage(jsonNode);
                log.debug(String.format("bubbling", new Object[0]));
            }
        }
    }
}
