/*
 * Decompiled with CFR 0.152.
 */
package io.continual.services.processor.library.modelio.sinks;

import io.continual.builder.Builder;
import io.continual.services.model.core.Model;
import io.continual.services.model.core.ModelRequestContext;
import io.continual.services.model.core.ModelUpdater;
import io.continual.services.model.core.exceptions.ModelRequestException;
import io.continual.services.model.core.exceptions.ModelServiceException;
import io.continual.services.model.core.updaters.DataMerge;
import io.continual.services.processor.config.readers.ConfigLoadContext;
import io.continual.services.processor.engine.model.Message;
import io.continual.services.processor.engine.model.MessageProcessingContext;
import io.continual.services.processor.engine.model.Sink;
import io.continual.services.processor.library.modelio.services.ModelService;
import io.continual.util.data.exprEval.ExpressionEvaluator;
import io.continual.util.naming.Path;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ModelSink
implements Sink {
    private final String fModelSvcName;
    private final String fModelPathExpr;
    private static final Logger log = LoggerFactory.getLogger(ModelSink.class);

    public ModelSink(ConfigLoadContext clc, JSONObject config) throws Builder.BuildFailure {
        try {
            ExpressionEvaluator ee = clc.getServiceContainer().getExprEval(config);
            this.fModelSvcName = ee.evaluateText(config.getString("modelName"));
            this.fModelPathExpr = config.getString("modelPath");
        }
        catch (JSONException e) {
            throw new Builder.BuildFailure((Throwable)e);
        }
    }

    public synchronized void init() {
    }

    public synchronized void flush() {
    }

    public synchronized void close() {
        log.warn("ModelSink closing...");
    }

    public synchronized void process(MessageProcessingContext context) {
        try {
            ModelService ms = (ModelService)((Object)context.getStreamProcessingContext().getNamedObject(this.fModelSvcName, ModelService.class));
            if (ms == null) {
                context.getStreamProcessingContext().fail("No model service named " + this.fModelSvcName + ".");
                return;
            }
            Model model = ms.getModel();
            ModelRequestContext mrc = model.getRequestContextBuilder().forUser(context.getStreamProcessingContext().getOperator()).build();
            Message msg = context.getMessage();
            String modelPathText = context.evalExpression(this.fModelPathExpr);
            Path modelPath = Path.fromString((String)modelPathText);
            log.info("Writing to {}", (Object)modelPathText);
            model.store(mrc, modelPath, new ModelUpdater[]{new DataMerge(msg.toJson())});
        }
        catch (Builder.BuildFailure | ModelRequestException | ModelServiceException e) {
            context.getStreamProcessingContext().fail(e.getMessage());
        }
    }
}

