package org.kevoree.library;

import com.mongodb.ServerAddress;
import com.mongodb.async.client.MongoClient;
import com.mongodb.async.client.MongoClientSettings;
import com.mongodb.async.client.MongoClients;
import com.mongodb.async.client.MongoDatabase;
import com.mongodb.connection.ClusterSettings;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.bson.BsonInvalidOperationException;
import org.bson.BsonString;
import org.bson.Document;
import org.bson.json.JsonParseException;
import org.kevoree.annotation.ChannelType;
import org.kevoree.annotation.KevoreeInject;
import org.kevoree.annotation.Param;
import org.kevoree.annotation.Start;
import org.kevoree.annotation.Stop;
import org.kevoree.annotation.Update;
import org.kevoree.api.Callback;
import org.kevoree.api.ChannelContext;
import org.kevoree.api.ChannelDispatch;
import org.kevoree.log.Log;
import org.kevoree.service.ModelService;

@ChannelType(version = 1)
/* loaded from: input_file:org/kevoree/library/MongoChan.class */
public class MongoChan implements ChannelDispatch {

    @Param(optional = false)
    private String host = "localhost";

    @Param(optional = false)
    private int port = 27017;

    @Param(optional = false)
    private String database;

    @Param(optional = false)
    private String collection;
    private MongoDatabase db;
    private MongoClient mongoClient;

    @KevoreeInject
    private ChannelContext context;

    @KevoreeInject
    private ModelService modelService;
    private ScheduledExecutorService service;

    @Start
    public void start() {
        this.mongoClient = MongoClients.create(MongoClientSettings.builder().clusterSettings(ClusterSettings.builder().hosts(Arrays.asList(new ServerAddress(this.host, this.port))).build()).build());
        this.db = this.mongoClient.getDatabase(this.database);
        launchConsumers();
    }

    @Stop
    public void stop() {
        this.mongoClient.close();
    }

    @Update
    public void update() {
        stop();
        start();
    }

    public void dispatch(String str, Callback callback) {
        Document document = new Document();
        document.put("payload", getPayload(str));
        this.db.getCollection(this.collection).insertMany((List) Stream.concat(this.context.getLocalInputs().stream().map(port -> {
            return decoratePort(port.getPath(), document);
        }), this.context.getRemoteInputs().stream().map(port2 -> {
            return decoratePort(port2.getPath(), document);
        })).collect(Collectors.toList()), (r2, th) -> {
            if (th != null) {
                Log.error(th.getMessage());
            }
        });
    }

    private void launchConsumers() {
        this.service = Executors.newScheduledThreadPool(1);
        this.service.scheduleWithFixedDelay(new MongoChanFetcher(this.context.getLocalInputs(), this.mongoClient, this.database, this.collection), 0L, 1L, TimeUnit.SECONDS);
    }

    private Document decoratePort(String str, Document document) {
        Document cloning = cloning(document);
        cloning.put("port", new BsonString(str));
        return cloning;
    }

    private Document cloning(Document document) {
        return Document.parse(document.toJson());
    }

    private Object getPayload(String str) {
        String str2;
        try {
            str2 = Document.parse(str);
        } catch (JsonParseException | BsonInvalidOperationException e) {
            str2 = str;
        }
        return str2;
    }
}
