package org.ianitrix.kafka.interceptors;

import java.time.Instant;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.ianitrix.kafka.interceptors.pojo.TraceType;
import org.ianitrix.kafka.interceptors.pojo.TracingKey;
import org.ianitrix.kafka.interceptors.pojo.TracingValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/ianitrix/kafka/interceptors/ConsumerTracingInterceptor.class */
public class ConsumerTracingInterceptor extends AbstractTracingInterceptor implements ConsumerInterceptor<Object, Object> {
    private static final Logger log = LoggerFactory.getLogger(ConsumerTracingInterceptor.class);
    private String groupId;

    @Override // org.ianitrix.kafka.interceptors.AbstractTracingInterceptor
    public void configure(Map<String, ?> map) {
        super.configure(map);
        this.groupId = (String) map.get("group.id");
    }

    @Override // org.ianitrix.kafka.interceptors.AbstractTracingInterceptor
    public void close() {
        super.close();
    }

    public ConsumerRecords<Object, Object> onConsume(ConsumerRecords<Object, Object> consumerRecords) {
        consumerRecords.forEach(this::sendConsume);
        return consumerRecords;
    }

    private void sendConsume(ConsumerRecord<Object, Object> consumerRecord) {
        super.sendTrace(TracingKey.builder().topic(consumerRecord.topic()).partition(Integer.valueOf(consumerRecord.partition())).offset(Long.valueOf(consumerRecord.offset())).groupId(this.groupId).build(), TracingValue.builder().topic(consumerRecord.topic()).partition(Integer.valueOf(consumerRecord.partition())).offset(Long.valueOf(consumerRecord.offset())).correlationId(super.getOrCreateCorrelationID(consumerRecord.headers())).date(Instant.now().toString()).groupId(this.groupId).type(TraceType.CONSUME).build());
    }

    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        map.forEach(this::sendCommit);
    }

    private void sendCommit(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        super.sendTrace(TracingKey.builder().topic(topicPartition.topic()).partition(Integer.valueOf(topicPartition.partition())).offset(Long.valueOf(offsetAndMetadata.offset() - 1)).groupId(this.groupId).build(), TracingValue.builder().topic(topicPartition.topic()).partition(Integer.valueOf(topicPartition.partition())).offset(Long.valueOf(offsetAndMetadata.offset() - 1)).date(Instant.now().toString()).type(TraceType.COMMIT).groupId(this.groupId).build());
    }
}
