package io.kubemq.sdk.pubsub;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.function.Consumer;
import kubemq.Kubemq;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kubemq/sdk/pubsub/EventsSubscription.class */
public class EventsSubscription {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(EventsSubscription.class);
    private String channel;
    private String group;
    private Consumer<EventMessageReceived> onReceiveEventCallback;
    private Consumer<String> onErrorCallback;
    private transient StreamObserver<Kubemq.EventReceive> observer;

    @Generated
    /* loaded from: input_file:io/kubemq/sdk/pubsub/EventsSubscription$EventsSubscriptionBuilder.class */
    public static class EventsSubscriptionBuilder {

        @Generated
        private String channel;

        @Generated
        private String group;

        @Generated
        private Consumer<EventMessageReceived> onReceiveEventCallback;

        @Generated
        private Consumer<String> onErrorCallback;

        @Generated
        private StreamObserver<Kubemq.EventReceive> observer;

        @Generated
        EventsSubscriptionBuilder() {
        }

        @Generated
        public EventsSubscriptionBuilder channel(String str) {
            this.channel = str;
            return this;
        }

        @Generated
        public EventsSubscriptionBuilder group(String str) {
            this.group = str;
            return this;
        }

        @Generated
        public EventsSubscriptionBuilder onReceiveEventCallback(Consumer<EventMessageReceived> consumer) {
            this.onReceiveEventCallback = consumer;
            return this;
        }

        @Generated
        public EventsSubscriptionBuilder onErrorCallback(Consumer<String> consumer) {
            this.onErrorCallback = consumer;
            return this;
        }

        @Generated
        public EventsSubscriptionBuilder observer(StreamObserver<Kubemq.EventReceive> streamObserver) {
            this.observer = streamObserver;
            return this;
        }

        @Generated
        public EventsSubscription build() {
            return new EventsSubscription(this.channel, this.group, this.onReceiveEventCallback, this.onErrorCallback, this.observer);
        }

        @Generated
        public String toString() {
            return "EventsSubscription.EventsSubscriptionBuilder(channel=" + this.channel + ", group=" + this.group + ", onReceiveEventCallback=" + this.onReceiveEventCallback + ", onErrorCallback=" + this.onErrorCallback + ", observer=" + this.observer + ")";
        }
    }

    public void raiseOnReceiveMessage(EventMessageReceived eventMessageReceived) {
        if (this.onReceiveEventCallback != null) {
            this.onReceiveEventCallback.accept(eventMessageReceived);
        }
    }

    public void raiseOnError(String str) {
        if (this.onErrorCallback != null) {
            this.onErrorCallback.accept(str);
        }
    }

    public void cancel() {
        if (this.observer != null) {
            this.observer.onCompleted();
            log.debug("Subscription Cancelled");
        }
    }

    public void validate() {
        if (this.channel == null || this.channel.isEmpty()) {
            throw new IllegalArgumentException("Event subscription must have a channel.");
        }
        if (this.onReceiveEventCallback == null) {
            throw new IllegalArgumentException("Event subscription must have a onReceiveEventCallback function.");
        }
    }

    public Kubemq.Subscribe encode(String str, final PubSubClient pubSubClient) {
        Kubemq.Subscribe m730build = Kubemq.Subscribe.newBuilder().setChannel(this.channel).setGroup(this.group != null ? this.group : "").setClientID(str).setSubscribeTypeData(Kubemq.Subscribe.SubscribeType.Events).setSubscribeTypeDataValue(1).m730build();
        this.observer = new StreamObserver<Kubemq.EventReceive>() { // from class: io.kubemq.sdk.pubsub.EventsSubscription.1
            public void onNext(Kubemq.EventReceive eventReceive) {
                EventsSubscription.log.debug("Event Received Event: EventID:'{}', Channel:'{}', Metadata: '{}'", new Object[]{eventReceive.getEventID(), eventReceive.getChannel(), eventReceive.getMetadata()});
                EventsSubscription.this.raiseOnReceiveMessage(EventMessageReceived.decode(eventReceive));
            }

            public void onError(Throwable th) {
                EventsSubscription.log.error("Error:-- > " + th.getMessage());
                EventsSubscription.this.raiseOnError(th.getMessage());
                if (th instanceof StatusRuntimeException) {
                    EventsSubscription.this.reconnect(pubSubClient);
                }
            }

            public void onCompleted() {
                EventsSubscription.log.debug("StreamObserver completed.");
            }
        };
        return m730build;
    }

    private void reconnect(PubSubClient pubSubClient) {
        try {
            Thread.sleep(pubSubClient.getReconnectIntervalSeconds());
            log.debug("Attempting to re-subscribe... ");
            pubSubClient.getAsyncClient().subscribeToEvents(encode(pubSubClient.getClientId(), pubSubClient), getObserver());
            log.debug("Re-subscribed successfully");
        } catch (Exception e) {
            log.error("Re-subscribe attempt failed", e);
            reconnect(pubSubClient);
        }
    }

    public String toString() {
        return String.format("EventsSubscription: channel=%s, group=%s", this.channel, this.group);
    }

    @Generated
    public static EventsSubscriptionBuilder builder() {
        return new EventsSubscriptionBuilder();
    }

    @Generated
    public String getChannel() {
        return this.channel;
    }

    @Generated
    public String getGroup() {
        return this.group;
    }

    @Generated
    public Consumer<EventMessageReceived> getOnReceiveEventCallback() {
        return this.onReceiveEventCallback;
    }

    @Generated
    public Consumer<String> getOnErrorCallback() {
        return this.onErrorCallback;
    }

    @Generated
    public StreamObserver<Kubemq.EventReceive> getObserver() {
        return this.observer;
    }

    @Generated
    public void setChannel(String str) {
        this.channel = str;
    }

    @Generated
    public void setGroup(String str) {
        this.group = str;
    }

    @Generated
    public void setOnReceiveEventCallback(Consumer<EventMessageReceived> consumer) {
        this.onReceiveEventCallback = consumer;
    }

    @Generated
    public void setOnErrorCallback(Consumer<String> consumer) {
        this.onErrorCallback = consumer;
    }

    @Generated
    public EventsSubscription() {
    }

    @Generated
    public EventsSubscription(String str, String str2, Consumer<EventMessageReceived> consumer, Consumer<String> consumer2, StreamObserver<Kubemq.EventReceive> streamObserver) {
        this.channel = str;
        this.group = str2;
        this.onReceiveEventCallback = consumer;
        this.onErrorCallback = consumer2;
        this.observer = streamObserver;
    }

    @Generated
    public void setObserver(StreamObserver<Kubemq.EventReceive> streamObserver) {
        this.observer = streamObserver;
    }
}
