package io.odpf.depot.bigtable;

import io.odpf.depot.OdpfSink;
import io.odpf.depot.OdpfSinkResponse;
import io.odpf.depot.bigtable.client.BigTableClient;
import io.odpf.depot.bigtable.model.BigTableRecord;
import io.odpf.depot.bigtable.parser.BigTableRecordParser;
import io.odpf.depot.bigtable.parser.BigTableResponseParser;
import io.odpf.depot.bigtable.response.BigTableResponse;
import io.odpf.depot.error.ErrorInfo;
import io.odpf.depot.message.OdpfMessage;
import io.odpf.depot.metrics.BigTableMetrics;
import io.odpf.depot.metrics.Instrumentation;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/* loaded from: input_file:io/odpf/depot/bigtable/BigTableSink.class */
public class BigTableSink implements OdpfSink {
    private final BigTableClient bigTableClient;
    private final BigTableRecordParser bigTableRecordParser;
    private final BigTableMetrics bigtableMetrics;
    private final Instrumentation instrumentation;

    public BigTableSink(BigTableClient bigTableClient, BigTableRecordParser bigTableRecordParser, BigTableMetrics bigTableMetrics, Instrumentation instrumentation) {
        this.bigTableClient = bigTableClient;
        this.bigTableRecordParser = bigTableRecordParser;
        this.bigtableMetrics = bigTableMetrics;
        this.instrumentation = instrumentation;
    }

    @Override // io.odpf.depot.OdpfSink
    public OdpfSinkResponse pushToSink(List<OdpfMessage> list) {
        Map map = (Map) this.bigTableRecordParser.convert(list).stream().collect(Collectors.partitioningBy((v0) -> {
            return v0.isValid();
        }));
        List list2 = (List) map.get(Boolean.FALSE);
        List<BigTableRecord> list3 = (List) map.get(Boolean.TRUE);
        OdpfSinkResponse odpfSinkResponse = new OdpfSinkResponse();
        list2.forEach(bigTableRecord -> {
            odpfSinkResponse.addErrors(bigTableRecord.getIndex(), bigTableRecord.getErrorInfo());
        });
        if (list3.size() > 0) {
            BigTableResponse send = this.bigTableClient.send(list3);
            this.instrumentation.logInfo("Processed a batch of {} records to BigTable", Integer.valueOf(list3.size()));
            if (send != null && send.hasErrors()) {
                this.instrumentation.logInfo("Found {} Error records in response", Integer.valueOf(send.getErrorCount()));
                Map<Long, ErrorInfo> errorsFromSinkResponse = BigTableResponseParser.getErrorsFromSinkResponse(list3, send, this.bigtableMetrics, this.instrumentation);
                odpfSinkResponse.getClass();
                errorsFromSinkResponse.forEach((v1, v2) -> {
                    r1.addErrors(v1, v2);
                });
            }
        }
        return odpfSinkResponse;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }
}
