package io.hstream.impl;

import com.google.common.util.concurrent.AbstractService;
import io.grpc.stub.StreamObserver;
import io.hstream.Consumer;
import io.hstream.HRecord;
import io.hstream.HStreamClient;
import io.hstream.Observer;
import io.hstream.Queryer;
import io.hstream.Subscription;
import io.hstream.internal.CreateQueryStreamRequest;
import io.hstream.internal.CreateQueryStreamResponse;
import io.hstream.internal.HStreamApiGrpc;
import io.hstream.internal.Stream;
import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hstream/impl/QueryerImpl.class */
public class QueryerImpl extends AbstractService implements Queryer {
    private static final Logger logger = LoggerFactory.getLogger(QueryerImpl.class);
    private static final String STREAM_QUERY_STREAM_PREFIX = "STREAM-QUERY-";
    private static final String STREAM_QUERY_SUBSCRIPTION_PREFIX = "STREAM-QUERY-";
    private final HStreamClient client;
    private final List<String> serverUrls;
    private final ChannelProvider channelProvider;
    private HStreamApiGrpc.HStreamApiStub queryStub = createQueryStub();
    private final String sql;
    private final Observer<HRecord> resultObserver;
    private Consumer queryInnerConsumer;

    public QueryerImpl(HStreamClient hStreamClient, List<String> list, ChannelProvider channelProvider, String str, Observer<HRecord> observer) {
        this.client = hStreamClient;
        this.serverUrls = list;
        this.channelProvider = channelProvider;
        this.sql = str;
        this.resultObserver = observer;
    }

    private HStreamApiGrpc.HStreamApiStub createQueryStub() {
        return HStreamApiGrpc.newStub(this.channelProvider.get(this.serverUrls.get(0)));
    }

    protected void doStart() {
        final String uuid = UUID.randomUUID().toString();
        this.queryStub.createQueryStream(CreateQueryStreamRequest.newBuilder().setQueryStream(Stream.newBuilder().setStreamName("STREAM-QUERY-" + uuid).setReplicationFactor(3).m1287build()).setQueryStatements(this.sql).m288build(), new StreamObserver<CreateQueryStreamResponse>() { // from class: io.hstream.impl.QueryerImpl.1
            public void onNext(CreateQueryStreamResponse createQueryStreamResponse) {
                QueryerImpl.logger.info("query [{}] created, related result stream is [{}]", createQueryStreamResponse.getStreamQuery().getId(), createQueryStreamResponse.getQueryStream().getStreamName());
                QueryerImpl.this.client.createSubscription(Subscription.newBuilder().subscription("STREAM-QUERY-" + uuid).stream("STREAM-QUERY-" + uuid).ackTimeoutSeconds(10).build());
                QueryerImpl.this.queryInnerConsumer = QueryerImpl.this.client.newConsumer().subscription("STREAM-QUERY-" + uuid).hRecordReceiver((receivedHRecord, responder) -> {
                    try {
                        QueryerImpl.this.resultObserver.onNext(receivedHRecord.getHRecord());
                        responder.ack();
                    } catch (Throwable th) {
                        QueryerImpl.this.resultObserver.onError(th);
                    }
                }).build();
                QueryerImpl.this.queryInnerConsumer.startAsync().awaitRunning();
                QueryerImpl.this.notifyStarted();
            }

            public void onError(Throwable th) {
                QueryerImpl.logger.error("creating stream query happens error: ", th);
                QueryerImpl.this.notifyFailed(th);
            }

            public void onCompleted() {
            }
        });
    }

    protected void doStop() {
        new Thread(() -> {
            this.queryInnerConsumer.stopAsync().awaitTerminated();
            notifyStopped();
        }).start();
    }
}
