package io.continual.services.processor.library.modelio.sinks;

import io.continual.builder.Builder;
import io.continual.services.model.core.Model;
import io.continual.services.model.core.ModelRequestContext;
import io.continual.services.model.core.data.JsonModelObject;
import io.continual.services.model.core.exceptions.ModelRequestException;
import io.continual.services.model.core.exceptions.ModelServiceException;
import io.continual.services.processor.config.readers.ConfigLoadContext;
import io.continual.services.processor.engine.model.Message;
import io.continual.services.processor.engine.model.MessageProcessingContext;
import io.continual.services.processor.engine.model.Sink;
import io.continual.services.processor.library.modelio.services.ModelService;
import io.continual.util.naming.Path;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/continual/services/processor/library/modelio/sinks/ModelSink.class */
public class ModelSink implements Sink {
    private final String fModelSvcName;
    private final String fModelPathExpr;
    private static final Logger log = LoggerFactory.getLogger(ModelSink.class);

    public ModelSink(ConfigLoadContext configLoadContext, JSONObject jSONObject) throws Builder.BuildFailure {
        try {
            this.fModelSvcName = configLoadContext.getServiceContainer().getExprEval(jSONObject).evaluateText(jSONObject.getString("modelName"));
            this.fModelPathExpr = jSONObject.getString("modelPath");
        } catch (JSONException e) {
            throw new Builder.BuildFailure(e);
        }
    }

    public synchronized void init() {
    }

    public synchronized void flush() {
    }

    public synchronized void close() {
        log.warn("ModelSink closing...");
    }

    public synchronized void process(MessageProcessingContext messageProcessingContext) {
        try {
            ModelService modelService = (ModelService) messageProcessingContext.getStreamProcessingContext().getNamedObject(this.fModelSvcName, ModelService.class);
            if (modelService == null) {
                messageProcessingContext.getStreamProcessingContext().fail("No model service named " + this.fModelSvcName + ".");
                return;
            }
            Model model = modelService.getModel();
            ModelRequestContext build = model.getRequestContextBuilder().forUser(messageProcessingContext.getStreamProcessingContext().getOperator()).build();
            Message message = messageProcessingContext.getMessage();
            String evalExpression = messageProcessingContext.evalExpression(this.fModelPathExpr);
            Path fromString = Path.fromString(evalExpression);
            log.info("Writing to {}", evalExpression);
            model.createUpdate(build, fromString).merge(new JsonModelObject(message.toJson())).execute();
        } catch (Builder.BuildFailure | ModelRequestException | ModelServiceException e) {
            messageProcessingContext.getStreamProcessingContext().fail(e.getMessage());
        }
    }
}
