package com.github.mangelion.achord;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/github/mangelion/achord/SendDataQueryContext.class */
public final class SendDataQueryContext<T> implements QueryContext {
    private static final int STATE_DISCONNECTED = 0;
    private static final int STATE_CONNECTED = 1;
    private static final int STATE_SERVER_INFO_RECEIVED = 2;
    private static final int STATE_SAMPLE_BLOCK_RECEIVED = 3;
    private final AuthData authData;
    private final String query;
    private final String queryId;
    private final Settings settings;
    private final Limits limits;
    private final Flow.Publisher<T[]> source;
    private final Channel channel;
    private final Flow.Subscriber<? super Void> s;
    private final EventLoopGroup workersGroup;
    private ClickHouseServerInfo serverInfo;
    private final AtomicInteger STATE = new AtomicInteger(STATE_DISCONNECTED);
    private GenericFutureListener<Future<? super Void>> catchErrorListener = future -> {
        if (future.isSuccess()) {
            return;
        }
        onChannelExceptionCaught(future.cause());
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    public SendDataQueryContext(AuthData authData, String str, String str2, Settings settings, Limits limits, Channel channel, Flow.Publisher<T[]> publisher, Flow.Subscriber<? super Void> subscriber, EventLoopGroup eventLoopGroup) {
        this.authData = authData;
        this.query = str2;
        this.queryId = str;
        this.settings = settings;
        this.limits = limits;
        this.source = publisher;
        this.channel = channel;
        this.s = subscriber;
        this.workersGroup = eventLoopGroup;
    }

    @Override // com.github.mangelion.achord.QueryContext
    public AuthData getAuthData() {
        return this.authData;
    }

    @Override // com.github.mangelion.achord.QueryContext
    public void onChannelConnected() {
        int compareAndExchange = this.STATE.compareAndExchange(STATE_DISCONNECTED, STATE_CONNECTED);
        if (compareAndExchange != 0) {
            throw new IllegalStateException("Context expected to be in STATE_DISCONNEC8TED but got " + compareAndExchange);
        }
        this.channel.writeAndFlush(new HelloMessage(this.authData)).addListener(this.catchErrorListener);
    }

    @Override // com.github.mangelion.achord.QueryContext
    public void onClickHouseServerInfoReceived(ClickHouseServerInfo clickHouseServerInfo) {
        int compareAndExchange = this.STATE.compareAndExchange(STATE_CONNECTED, STATE_SERVER_INFO_RECEIVED);
        if (compareAndExchange != STATE_CONNECTED) {
            throw new IllegalStateException("Context expected to be in STATE_CONNECTED but got " + compareAndExchange);
        }
        this.serverInfo = clickHouseServerInfo;
        this.channel.write(new SendQueryMessage(this.queryId, this.query, this.settings, this.limits, clickHouseServerInfo.serverRevision)).addListener(this.catchErrorListener);
        this.channel.writeAndFlush(DataBlock.EMPTY.retain()).addListener(this.catchErrorListener);
    }

    @Override // com.github.mangelion.achord.QueryContext
    public void onDataBlockReceived(DataBlock dataBlock) {
        int compareAndExchange = this.STATE.compareAndExchange(STATE_SERVER_INFO_RECEIVED, STATE_SAMPLE_BLOCK_RECEIVED);
        if (compareAndExchange != STATE_SERVER_INFO_RECEIVED) {
            throw new IllegalStateException("Context expected to be in STATE_SERVER_INFO_RECEIVED but got " + compareAndExchange);
        }
        EventLoop next = this.workersGroup.next();
        ObjectsToBlockProcessor objectsToBlockProcessor = new ObjectsToBlockProcessor(dataBlock, next, this.channel.alloc());
        ChannelHandler dataBlockSender = new DataBlockSender(next);
        if (this.channel.pipeline().get("blockCompressor") != null) {
            this.channel.pipeline().addBefore("blockCompressor", "reactiveBlockSender", dataBlockSender);
        } else {
            this.channel.pipeline().addFirst(new ChannelHandler[]{dataBlockSender});
        }
        next.execute(() -> {
            this.source.subscribe(objectsToBlockProcessor);
            objectsToBlockProcessor.subscribe(dataBlockSender);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.github.mangelion.achord.QueryContext
    public void onServerExceptionCaught(ClickHouseServerException clickHouseServerException) {
        this.s.onError(clickHouseServerException);
    }

    @Override // com.github.mangelion.achord.QueryContext
    public void onEndOfStream() {
        throw new IllegalStateException("End of stream should not be passed to SendDataQueryContext");
    }

    @Override // com.github.mangelion.achord.QueryContext
    public void onChannelExceptionCaught(Throwable th) {
        this.s.onError(th);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void completed() {
        this.s.onComplete();
    }
}
