/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KStreamKTableJoinProcessor<K1, K2, V1, V2, R>
extends AbstractProcessor<K1, V1> {
    private static final Logger LOG = LoggerFactory.getLogger(KStreamKTableJoinProcessor.class);
    private final KTableValueGetter<K2, V2> valueGetter;
    private final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper;
    private final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
    private final boolean leftJoin;
    private StreamsMetricsImpl metrics;

    KStreamKTableJoinProcessor(KTableValueGetter<K2, V2> valueGetter, KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper, ValueJoiner<? super V1, ? super V2, ? extends R> joiner, boolean leftJoin) {
        this.valueGetter = valueGetter;
        this.keyMapper = keyMapper;
        this.joiner = joiner;
        this.leftJoin = leftJoin;
    }

    @Override
    public void init(ProcessorContext context) {
        super.init(context);
        this.metrics = (StreamsMetricsImpl)context.metrics();
        this.valueGetter.init(context);
    }

    @Override
    public void process(K1 key, V1 value) {
        if (key == null || value == null) {
            LOG.warn("Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]", key, value, this.context().topic(), this.context().partition(), this.context().offset());
            this.metrics.skippedRecordsSensor().record();
        } else {
            Object value2;
            K2 mappedKey = this.keyMapper.apply(key, value);
            Object V2 = value2 = mappedKey == null ? null : (Object)this.valueGetter.get(mappedKey);
            if (this.leftJoin || value2 != null) {
                this.context().forward(key, this.joiner.apply(value, value2));
            }
        }
    }

    @Override
    public void close() {
        this.valueGetter.close();
    }
}

