package io.odpf.depot.redis;

import io.odpf.depot.OdpfSink;
import io.odpf.depot.OdpfSinkResponse;
import io.odpf.depot.error.ErrorInfo;
import io.odpf.depot.message.OdpfMessage;
import io.odpf.depot.metrics.Instrumentation;
import io.odpf.depot.redis.client.RedisClient;
import io.odpf.depot.redis.parsers.RedisParser;
import io.odpf.depot.redis.record.RedisRecord;
import io.odpf.depot.redis.util.RedisSinkUtils;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/* loaded from: input_file:io/odpf/depot/redis/RedisSink.class */
public class RedisSink implements OdpfSink {
    private final RedisClient redisClient;
    private final RedisParser redisParser;
    private final Instrumentation instrumentation;

    public RedisSink(RedisClient redisClient, RedisParser redisParser, Instrumentation instrumentation) {
        this.redisClient = redisClient;
        this.redisParser = redisParser;
        this.instrumentation = instrumentation;
    }

    @Override // io.odpf.depot.OdpfSink
    public OdpfSinkResponse pushToSink(List<OdpfMessage> list) {
        Map map = (Map) this.redisParser.convert(list).stream().collect(Collectors.partitioningBy((v0) -> {
            return v0.isValid();
        }));
        List list2 = (List) map.get(Boolean.FALSE);
        List<RedisRecord> list3 = (List) map.get(Boolean.TRUE);
        OdpfSinkResponse odpfSinkResponse = new OdpfSinkResponse();
        list2.forEach(redisRecord -> {
            odpfSinkResponse.addErrors(redisRecord.getIndex().longValue(), redisRecord.getErrorInfo());
        });
        if (list3.size() > 0) {
            Map<Long, ErrorInfo> errorsFromResponse = RedisSinkUtils.getErrorsFromResponse(list3, this.redisClient.send(list3), this.instrumentation);
            odpfSinkResponse.getClass();
            errorsFromResponse.forEach((v1, v2) -> {
                r1.addErrors(v1, v2);
            });
            this.instrumentation.logInfo("Pushed a batch of {} records to Redis", Integer.valueOf(list3.size()));
        }
        return odpfSinkResponse;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }
}
