package net.snowflake.ingest.streaming.internal;

import com.google.common.annotations.VisibleForTesting;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import net.snowflake.ingest.streaming.DropChannelRequest;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel;
import net.snowflake.ingest.streaming.internal.ChannelsStatusResponse;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.SFException;
import net.snowflake.ingest.utils.Utils;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.class */
public class SnowflakeStreamingIngestChannelInternal<T> implements SnowflakeStreamingIngestChannel {
    private static final Logging logger = new Logging(SnowflakeStreamingIngestChannelInternal.class);
    private final ChannelFlushContext channelFlushContext;
    private final RowBuffer<T> rowBuffer;
    private volatile boolean isClosed;
    private final SnowflakeStreamingIngestClientInternal<T> owningClient;
    private final ChannelRuntimeState channelState;
    private final Map<String, ColumnProperties> tableColumns;

    SnowflakeStreamingIngestChannelInternal(String str, String str2, String str3, String str4, String str5, Long l, Long l2, SnowflakeStreamingIngestClientInternal<T> snowflakeStreamingIngestClientInternal, String str6, Long l3, OpenChannelRequest.OnErrorOption onErrorOption, ZoneOffset zoneOffset) {
        this(str, str2, str3, str4, str5, l, l2, snowflakeStreamingIngestClientInternal, str6, l3, onErrorOption, zoneOffset, snowflakeStreamingIngestClientInternal.getParameterProvider().getBlobFormatVersion(), null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SnowflakeStreamingIngestChannelInternal(String str, String str2, String str3, String str4, String str5, Long l, Long l2, SnowflakeStreamingIngestClientInternal<T> snowflakeStreamingIngestClientInternal, String str6, Long l3, OpenChannelRequest.OnErrorOption onErrorOption, ZoneId zoneId, Constants.BdecVersion bdecVersion, OffsetTokenVerificationFunction offsetTokenVerificationFunction) {
        this.isClosed = false;
        this.owningClient = snowflakeStreamingIngestClientInternal;
        this.channelFlushContext = new ChannelFlushContext(str, str2, str3, str4, l, str6, l3);
        this.channelState = new ChannelRuntimeState(str5, l2.longValue(), true);
        this.rowBuffer = AbstractRowBuffer.createRowBuffer(onErrorOption, zoneId, bdecVersion, getFullyQualifiedName(), (v1) -> {
            collectRowSize(v1);
        }, this.channelState, new ClientBufferParameters(this.owningClient), offsetTokenVerificationFunction, this.owningClient == null ? null : this.owningClient.getTelemetryService());
        this.tableColumns = new HashMap();
        logger.logInfo("Channel={} created for table={}", this.channelFlushContext.getName(), this.channelFlushContext.getTableName());
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public String getFullyQualifiedName() {
        return this.channelFlushContext.getFullyQualifiedName();
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public String getName() {
        return this.channelFlushContext.getName();
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public String getDBName() {
        return this.channelFlushContext.getDbName();
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public String getSchemaName() {
        return this.channelFlushContext.getSchemaName();
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public String getTableName() {
        return this.channelFlushContext.getTableName();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long getChannelSequencer() {
        return this.channelFlushContext.getChannelSequencer();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public ChannelRuntimeState getChannelState() {
        return this.channelState;
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public String getFullyQualifiedTableName() {
        return this.channelFlushContext.getFullyQualifiedTableName();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelData<T> getData(String str) {
        ChannelData<T> flush = this.rowBuffer.flush(str);
        if (flush != null) {
            flush.setChannelContext(this.channelFlushContext);
        }
        return flush;
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public boolean isValid() {
        return this.channelState.isValid();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void invalidate(String str) {
        this.channelState.invalidate();
        this.rowBuffer.close("invalidate");
        logger.logWarn("Channel is invalidated, name={}, channel sequencer={}, row sequencer={}, message={}", getFullyQualifiedName(), this.channelFlushContext.getChannelSequencer(), Long.valueOf(this.channelState.getRowSequencer()), str);
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public boolean isClosed() {
        return this.isClosed;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void markClosed() {
        this.isClosed = true;
        logger.logInfo("Channel is marked as closed, name={}, channel sequencer={}, row sequencer={}", getFullyQualifiedName(), this.channelFlushContext.getChannelSequencer(), Long.valueOf(this.channelState.getRowSequencer()));
    }

    CompletableFuture<Void> flush(boolean z) {
        if (!isClosed() || z) {
            return this.rowBuffer.getSize() == 0.0f ? CompletableFuture.completedFuture(null) : this.owningClient.flush(false);
        }
        throw new SFException(ErrorCode.CLOSED_CHANNEL, getFullyQualifiedName());
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public CompletableFuture<Void> close() {
        return close(false);
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public CompletableFuture<Void> close(boolean z) {
        checkValidation();
        if (isClosed()) {
            return CompletableFuture.completedFuture(null);
        }
        markClosed();
        return flush(true).thenRunAsync(() -> {
            List<SnowflakeStreamingIngestChannelInternal<?>> verifyChannelsAreFullyCommitted = this.owningClient.verifyChannelsAreFullyCommitted(Collections.singletonList(this));
            this.rowBuffer.close("close");
            this.owningClient.removeChannelIfSequencersMatch(this);
            if (!isValid() || !verifyChannelsAreFullyCommitted.isEmpty()) {
                throw new SFException(ErrorCode.CHANNELS_WITH_UNCOMMITTED_ROWS, verifyChannelsAreFullyCommitted.stream().map((v0) -> {
                    return v0.getFullyQualifiedName();
                }).collect(Collectors.toList()));
            }
            if (z) {
                this.owningClient.dropChannel(new DropChannelVersionRequest(DropChannelRequest.builder(getChannelContext().getName()).setDBName(getDBName()).setTableName(getTableName()).setSchemaName(getSchemaName()), getChannelSequencer().longValue()));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setupSchema(List<ColumnMetadata> list) {
        logger.logDebug("Setup schema for channel={}, schema={}", getFullyQualifiedName(), list);
        this.rowBuffer.setupSchema(list);
        list.forEach(columnMetadata -> {
            this.tableColumns.putIfAbsent(columnMetadata.getName(), new ColumnProperties(columnMetadata));
        });
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public InsertValidationResponse insertRow(Map<String, Object> map, String str) {
        return insertRows(Collections.singletonList(map), str, str);
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public InsertValidationResponse insertRows(Iterable<Map<String, Object>> iterable, @Nullable String str, @Nullable String str2) {
        throttleInsertIfNeeded(new MemoryInfoProviderFromRuntime());
        checkValidation();
        if (isClosed()) {
            throw new SFException(ErrorCode.CLOSED_CHANNEL, getFullyQualifiedName());
        }
        LinkedList linkedList = new LinkedList();
        iterable.forEach(map -> {
            linkedList.add(new LinkedHashMap(map));
        });
        InsertValidationResponse insertRows = this.rowBuffer.insertRows(linkedList, str, str2);
        if (this.rowBuffer.getSize() >= ((float) this.owningClient.getParameterProvider().getMaxChannelSizeInBytes())) {
            this.owningClient.setNeedFlush();
        }
        return insertRows;
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public InsertValidationResponse insertRows(Iterable<Map<String, Object>> iterable, String str) {
        return insertRows(iterable, null, str);
    }

    void collectRowSize(float f) {
        if (this.owningClient.inputThroughput != null) {
            this.owningClient.inputThroughput.mark(f);
        }
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public String getLatestCommittedOffsetToken() {
        checkValidation();
        ChannelsStatusResponse.ChannelStatusResponseDTO channelStatusResponseDTO = this.owningClient.getChannelsStatus(Collections.singletonList(this)).getChannels().get(0);
        if (channelStatusResponseDTO.getStatusCode().longValue() != 0) {
            throw new SFException(ErrorCode.CHANNEL_STATUS_INVALID, getName(), channelStatusResponseDTO.getStatusCode());
        }
        return channelStatusResponseDTO.getPersistedOffsetToken();
    }

    @Override // net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel
    public Map<String, ColumnProperties> getTableSchema() {
        return this.tableColumns;
    }

    void throttleInsertIfNeeded(MemoryInfoProvider memoryInfoProvider) {
        int i = 0;
        long insertThrottleIntervalInMs = this.owningClient.getParameterProvider().getInsertThrottleIntervalInMs();
        while (true) {
            if ((hasLowRuntimeMemory(memoryInfoProvider) || (this.owningClient.getFlushService() != null && this.owningClient.getFlushService().throttleDueToQueuedFlushTasks())) && i < 60) {
                try {
                    Thread.sleep(insertThrottleIntervalInMs);
                    i++;
                } catch (InterruptedException e) {
                    throw new SFException(ErrorCode.INTERNAL_ERROR, "Insert throttle get interrupted");
                }
            }
        }
        if (i > 0) {
            logger.logInfo("Insert throttled for a total of {} milliseconds, retryCount={}, client={}, channel={}", Long.valueOf(i * insertThrottleIntervalInMs), Integer.valueOf(i), this.owningClient.getName(), getFullyQualifiedName());
        }
    }

    private boolean hasLowRuntimeMemory(MemoryInfoProvider memoryInfoProvider) {
        int insertThrottleThresholdInBytes = this.owningClient.getParameterProvider().getInsertThrottleThresholdInBytes();
        int insertThrottleThresholdInPercentage = this.owningClient.getParameterProvider().getInsertThrottleThresholdInPercentage();
        long maxMemoryLimitInBytes = this.owningClient.getParameterProvider().getMaxMemoryLimitInBytes();
        long maxMemory = maxMemoryLimitInBytes == -1 ? memoryInfoProvider.getMaxMemory() : maxMemoryLimitInBytes;
        long freeMemory = memoryInfoProvider.getFreeMemory() + (memoryInfoProvider.getMaxMemory() - memoryInfoProvider.getTotalMemory());
        boolean z = freeMemory < ((long) insertThrottleThresholdInBytes) && (freeMemory * 100) / maxMemory < ((long) insertThrottleThresholdInPercentage);
        if (z) {
            logger.logWarn("Throttled due to memory pressure, client={}, channel={}.", this.owningClient.getName(), getFullyQualifiedName());
            Utils.showMemory();
        }
        return z;
    }

    private void checkValidation() {
        if (isValid()) {
            return;
        }
        this.owningClient.removeChannelIfSequencersMatch(this);
        this.rowBuffer.close("checkValidation");
        throw new SFException(ErrorCode.INVALID_CHANNEL, getFullyQualifiedName());
    }

    RowBuffer<T> getRowBuffer() {
        return this.rowBuffer;
    }

    @VisibleForTesting
    public ChannelFlushContext getChannelContext() {
        return this.channelFlushContext;
    }
}
