package io.kipe.streams.kafka.processors;

import java.time.Duration;
import java.util.Objects;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kipe/streams/kafka/processors/JoinBuilder.class */
public class JoinBuilder<K, V, OV, VR> extends AbstractTopologyPartBuilder<K, V> {
    static final Logger LOG = LoggerFactory.getLogger(JoinBuilder.class);
    private final KStream<K, OV> otherStream;
    private final Serde<OV> otherValueSerde;
    private Duration windowSizeBefore;
    private Duration windowSizeAfter;
    private Duration retentionPeriod;

    /* JADX INFO: Access modifiers changed from: package-private */
    public JoinBuilder(StreamsBuilder streamsBuilder, KStream<K, V> kStream, Serde<K> serde, Serde<V> serde2, KStream<K, OV> kStream2, Serde<OV> serde3, String str) {
        super(streamsBuilder, kStream, serde, serde2, str);
        Objects.requireNonNull(kStream2, "otherStream");
        if (serde3 == null) {
            LOG.warn("The default otherValueSerde is being used. To customize serdes, provide a specific serde to override this behavior.");
        }
        this.otherStream = kStream2;
        this.otherValueSerde = serde3;
    }

    public JoinBuilder<K, V, OV, VR> withWindowSize(Duration duration) {
        this.windowSizeBefore = duration;
        this.windowSizeAfter = duration;
        return this;
    }

    public JoinBuilder<K, V, OV, VR> withWindowSizeBefore(Duration duration) {
        this.windowSizeBefore = duration;
        return this;
    }

    public JoinBuilder<K, V, OV, VR> withWindowSizeAfter(Duration duration) {
        this.windowSizeAfter = duration;
        return this;
    }

    public JoinBuilder<K, V, OV, VR> withRetentionPeriod(Duration duration) {
        this.retentionPeriod = duration;
        return this;
    }

    public KipesBuilder<K, VR> as(ValueJoiner<V, OV, VR> valueJoiner, Serde<VR> serde) {
        Objects.requireNonNull(getTopicsBaseName(), "topicsBaseName");
        if (this.windowSizeBefore == null) {
            this.windowSizeBefore = Duration.ZERO;
        }
        if (this.windowSizeAfter == null) {
            this.windowSizeAfter = Duration.ZERO;
        }
        Objects.requireNonNull(this.retentionPeriod, "retentionPeriod");
        Objects.requireNonNull(valueJoiner, "joiner");
        if (serde == null) {
            LOG.warn("The default resultValueSerde is being used. To customize serdes, provide a specific serde to override this behavior.");
        }
        return (KipesBuilder<K, VR>) createKipesBuilder(this.stream.join(this.otherStream, valueJoiner, JoinWindows.of(Duration.ZERO).before(this.windowSizeBefore).after(this.windowSizeAfter).grace(this.retentionPeriod), StreamJoined.with(Stores.persistentWindowStore(getTopicsBaseName() + "-join-store-left", this.retentionPeriod.plus(this.windowSizeBefore).plus(this.windowSizeAfter), this.windowSizeBefore.plus(this.windowSizeAfter), true), Stores.persistentWindowStore(getTopicsBaseName() + "-join-store-right", this.retentionPeriod.plus(this.windowSizeBefore).plus(this.windowSizeAfter), this.windowSizeBefore.plus(this.windowSizeAfter), true)).withKeySerde(this.keySerde).withValueSerde(this.valueSerde).withOtherValueSerde(this.otherValueSerde)), this.keySerde, serde);
    }

    public KipesBuilder<K, VR> as(ValueJoiner<V, OV, VR> valueJoiner) {
        return as(valueJoiner, null);
    }
}
