package io.hstream.impl;

import com.google.protobuf.Empty;
import com.google.protobuf.Struct;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import io.hstream.CommandPushQuery;
import io.hstream.ConsumerBuilder;
import io.hstream.DeleteStreamRequest;
import io.hstream.DeleteSubscriptionRequest;
import io.hstream.HRecord;
import io.hstream.HStreamApiGrpc;
import io.hstream.HStreamClient;
import io.hstream.Observer;
import io.hstream.ProducerBuilder;
import io.hstream.Publisher;
import io.hstream.Stream;
import io.hstream.Subscription;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/hstream/impl/ClientImpl.class */
public class ClientImpl implements HStreamClient {
    private static final Logger logger = LoggerFactory.getLogger(ClientImpl.class);
    private final ManagedChannel managedChannel;
    private final HStreamApiGrpc.HStreamApiStub stub;
    private final HStreamApiGrpc.HStreamApiBlockingStub blockingStub;

    public ClientImpl(String str) {
        ManagedChannel build = ManagedChannelBuilder.forTarget(str).usePlaintext().build();
        this.managedChannel = build;
        this.stub = HStreamApiGrpc.newStub(build);
        this.blockingStub = HStreamApiGrpc.newBlockingStub(build);
    }

    @Override // io.hstream.HStreamClient
    public ProducerBuilder newProducer() {
        return new ProducerBuilder(this.stub);
    }

    @Override // io.hstream.HStreamClient
    public ConsumerBuilder newConsumer() {
        return new ConsumerBuilder(this.stub, this.blockingStub);
    }

    @Override // io.hstream.HStreamClient
    public Publisher<HRecord> streamQuery(String str) {
        final CommandPushQuery m134build = CommandPushQuery.newBuilder().setQueryText(str).m134build();
        return new Publisher<HRecord>() { // from class: io.hstream.impl.ClientImpl.1
            @Override // io.hstream.Publisher
            public void subscribe(final Observer<? super HRecord> observer) {
                ClientImpl.this.stub.executePushQuery(m134build, new StreamObserver<Struct>() { // from class: io.hstream.impl.ClientImpl.1.1
                    public void onNext(Struct struct) {
                        observer.onNext(new HRecord(struct.getFieldsOrThrow("SELECT").getStructValue()));
                    }

                    public void onError(Throwable th) {
                        observer.onError(th);
                    }

                    public void onCompleted() {
                        observer.onCompleted();
                    }
                });
            }
        };
    }

    @Override // io.hstream.HStreamClient
    public void createStream(String str) {
        this.blockingStub.createStream(Stream.newBuilder().setStreamName(str).setReplicationFactor(3).m802build());
    }

    @Override // io.hstream.HStreamClient
    public void deleteStream(String str) {
        this.blockingStub.deleteStream(DeleteStreamRequest.newBuilder().setStreamName(str).m322build());
        logger.info("delete stream {} done", str);
    }

    @Override // io.hstream.HStreamClient
    public List<Stream> listStreams() {
        return this.blockingStub.listStreams(Empty.newBuilder().build()).getStreamsList();
    }

    @Override // io.hstream.HStreamClient
    public void createSubscription(Subscription subscription) {
        this.blockingStub.createSubscription(subscription);
    }

    @Override // io.hstream.HStreamClient
    public List<Subscription> listSubscriptions() {
        return this.blockingStub.listSubscriptions(Empty.newBuilder().build()).getSubscriptionList();
    }

    @Override // io.hstream.HStreamClient
    public void deleteSubscription(String str) {
        this.blockingStub.withDeadlineAfter(1000L, TimeUnit.MILLISECONDS).deleteSubscription(DeleteSubscriptionRequest.newBuilder().setSubscriptionId(str).m369build());
        logger.info("delete subscription {} done", str);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.managedChannel.shutdownNow();
    }
}
