package xworker.org.apache.kafka;

import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.xmeta.ActionContext;
import org.xmeta.ActionException;
import org.xmeta.Thing;

/* loaded from: input_file:xworker/org/apache/kafka/XWorkerKafkaStream.class */
public class XWorkerKafkaStream {
    private static final String key = "STREAM";
    private Thing thing;
    private ActionContext actionContext;
    private KafkaStreams streams;

    public XWorkerKafkaStream(Thing thing, ActionContext actionContext) {
        this.thing = thing;
        this.actionContext = actionContext;
    }

    public synchronized void start() {
        if (this.streams != null && this.streams.state().isRunningOrRebalancing()) {
            throw new ActionException("KafkaStream already running, path=" + this.thing.getMetadata().getPath());
        }
        if (this.streams != null) {
            this.streams.close();
        }
        Properties properties = (Properties) this.thing.doAction("getConfig", this.actionContext);
        properties.put("application.id", this.thing.doAction("getApplicationId", this.actionContext));
        properties.put("default key.serde", this.thing.doAction("getKeySerdeClass", this.actionContext));
        properties.put("default value.serde", this.thing.doAction("getValueSerdeClass", this.actionContext));
        try {
            this.actionContext.push().setVarScopeFlag();
            this.thing.doAction("beforeInit", this.actionContext, new Object[]{"config", properties, "xstream", this});
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            Iterator it = this.thing.getChilds().iterator();
            while (it.hasNext()) {
                ((Thing) it.next()).doAction("build", this.actionContext, new Object[]{"builder", streamsBuilder, "xstream", this});
            }
            this.streams = new KafkaStreams(streamsBuilder.build(), properties);
            this.thing.doAction("afterInit", this.actionContext, new Object[]{"streams", this.streams, "xstream", this});
            this.actionContext.pop();
            this.streams.start();
        } catch (Throwable th) {
            this.actionContext.pop();
            throw th;
        }
    }

    public void close() {
        if (this.streams != null) {
            this.streams.close();
        }
    }

    public KafkaStreams getKafkaStreams() {
        return this.streams;
    }

    public Thing getThing() {
        return this.thing;
    }

    public ActionContext getActionContext() {
        return this.actionContext;
    }

    public static synchronized XWorkerKafkaStream getKafkaStreams(Thing thing, ActionContext actionContext) {
        XWorkerKafkaStream xWorkerKafkaStream = (XWorkerKafkaStream) thing.getStaticData(key);
        if (xWorkerKafkaStream == null) {
            xWorkerKafkaStream = new XWorkerKafkaStream(thing, actionContext);
            thing.setStaticData(key, xWorkerKafkaStream);
        }
        return xWorkerKafkaStream;
    }

    public static Class<?> getKeySerdeClass(ActionContext actionContext) {
        return SerdesActions.getSerdeClass(((Thing) actionContext.getObject("self")).getString("keySerdeClass"));
    }

    public static Class<?> getValueSerdeClass(ActionContext actionContext) {
        return SerdesActions.getSerdeClass(((Thing) actionContext.getObject("self")).getString("valueSerdeClass"));
    }

    public static void buildAddGlobalStore(ActionContext actionContext) {
        Thing thing = (Thing) actionContext.getObject("self");
        String str = (String) thing.doAction("getTopic", actionContext);
        StoreBuilder storeBuilder = null;
        Consumed consumed = null;
        ProcessorSupplier processorSupplier = null;
        StreamsBuilder streamsBuilder = (StreamsBuilder) actionContext.getObject("builder");
        Iterator it = thing.getChilds().iterator();
        while (it.hasNext()) {
            Object doAction = ((Thing) it.next()).doAction("create", actionContext);
            if ((doAction instanceof StoreBuilder) && storeBuilder != null) {
                storeBuilder = (StoreBuilder) doAction;
            }
            if ((doAction instanceof Consumed) && consumed != null) {
                consumed = (Consumed) doAction;
            }
            if ((doAction instanceof ProcessorSupplier) && processorSupplier != null) {
                processorSupplier = (ProcessorSupplier) doAction;
            }
        }
        StreamsBuilder addGlobalStore = streamsBuilder.addGlobalStore(storeBuilder, str, consumed, processorSupplier);
        Iterator it2 = thing.getChilds().iterator();
        while (it2.hasNext()) {
            ((Thing) it2.next()).doAction("build", actionContext, new Object[]{"builder", addGlobalStore});
        }
    }

    public static void buildAddStateStore(ActionContext actionContext) {
        Thing thing = (Thing) actionContext.getObject("self");
        StoreBuilder storeBuilder = null;
        StreamsBuilder streamsBuilder = (StreamsBuilder) actionContext.getObject("builder");
        Iterator it = thing.getChilds().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Object doAction = ((Thing) it.next()).doAction("create", actionContext);
            if ((doAction instanceof StoreBuilder) && 0 != 0) {
                storeBuilder = (StoreBuilder) doAction;
                break;
            }
        }
        if (storeBuilder == null) {
            return;
        }
        StreamsBuilder addStateStore = streamsBuilder.addStateStore(storeBuilder);
        Iterator it2 = thing.getChilds().iterator();
        while (it2.hasNext()) {
            ((Thing) it2.next()).doAction("build", actionContext, new Object[]{"builder", addStateStore});
        }
    }

    public static void buildGlobalTable(ActionContext actionContext) {
        Thing thing = (Thing) actionContext.getObject("self");
        String str = (String) thing.doAction("getTopic", actionContext);
        if (str == null) {
            throw new ActionException("Can not create GlobalTable, topic is null, path=" + thing.getMetadata().getPath());
        }
        StreamsBuilder streamsBuilder = (StreamsBuilder) actionContext.getObject("builder");
        Consumed consumed = null;
        Materialized materialized = null;
        Iterator it = thing.getChilds().iterator();
        while (it.hasNext()) {
            Object doAction = ((Thing) it.next()).doAction("create", actionContext);
            if ((doAction instanceof Consumed) && consumed != null) {
                consumed = (Consumed) doAction;
            }
            if (doAction instanceof Materialized) {
                materialized = (Materialized) doAction;
            }
            if (consumed != null && materialized != null) {
                break;
            }
        }
        actionContext.l().put(thing.getMetadata().getName(), (consumed == null || materialized == null) ? consumed != null ? streamsBuilder.globalTable(str, consumed) : materialized != null ? streamsBuilder.globalTable(str, materialized) : streamsBuilder.globalTable(str) : streamsBuilder.globalTable(str, consumed, materialized));
    }

    public static void buildTable(ActionContext actionContext) {
        Thing thing = (Thing) actionContext.getObject("self");
        String str = (String) thing.doAction("getTopic", actionContext);
        if (str == null) {
            throw new ActionException("Can not create GlobalTable, topic is null, path=" + thing.getMetadata().getPath());
        }
        StreamsBuilder streamsBuilder = (StreamsBuilder) actionContext.getObject("builder");
        Consumed consumed = null;
        Materialized materialized = null;
        Iterator it = thing.getChilds().iterator();
        while (it.hasNext()) {
            Object doAction = ((Thing) it.next()).doAction("create", actionContext);
            if ((doAction instanceof Consumed) && consumed != null) {
                consumed = (Consumed) doAction;
            }
            if (doAction instanceof Materialized) {
                materialized = (Materialized) doAction;
            }
            if (consumed != null && materialized != null) {
                break;
            }
        }
        KTable table = (consumed == null || materialized == null) ? consumed != null ? streamsBuilder.table(str, consumed) : materialized != null ? streamsBuilder.table(str, materialized) : streamsBuilder.table(str) : streamsBuilder.table(str, consumed, materialized);
        actionContext.l().put(thing.getMetadata().getName(), table);
        Iterator it2 = thing.getChilds().iterator();
        while (it2.hasNext()) {
            ((Thing) it2.next()).doAction("build", actionContext, new Object[]{"ktable", table});
        }
    }

    public static void buildStream(ActionContext actionContext) {
        Thing thing = (Thing) actionContext.getObject("self");
        List list = (List) thing.doAction("getTopics", actionContext);
        StreamsBuilder streamsBuilder = (StreamsBuilder) actionContext.getObject("builder");
        Consumed consumed = null;
        Iterator it = thing.getChilds().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Object doAction = ((Thing) it.next()).doAction("create", actionContext);
            if ((doAction instanceof Consumed) && 0 != 0) {
                consumed = (Consumed) doAction;
                break;
            }
        }
        KStream stream = consumed != null ? streamsBuilder.stream(list, consumed) : streamsBuilder.stream(list);
        actionContext.l().put(thing.getMetadata().getName(), stream);
        Iterator it2 = thing.getChilds().iterator();
        while (it2.hasNext()) {
            ((Thing) it2.next()).doAction("build", actionContext, new Object[]{"kstream", stream});
        }
    }
}
