package net.tangly.fsm.actors;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:net/tangly/fsm/actors/Channel.class */
public class Channel<T> implements AutoCloseable {
    private final String name;
    private final SubmissionPublisher<T> publisher = new SubmissionPublisher<>();

    /* loaded from: input_file:net/tangly/fsm/actors/Channel$ActorSubscriber.class */
    static class ActorSubscriber<T> implements Flow.Subscriber<T> {
        private static final Logger logger = LogManager.getLogger();
        private final Actor<T> actor;
        private final Channel<T> channel;
        private Flow.Subscription subscription;

        public ActorSubscriber(@NotNull Actor<T> actor, @NotNull Channel<T> channel) {
            this.actor = actor;
            this.channel = channel;
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onSubscribe(@NotNull Flow.Subscription subscription) {
            logger.atInfo().log("actor {} is subscribed to channel {}", this.actor.name(), this.channel.name());
            this.subscription = subscription;
            subscription.request(1L);
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onNext(@NotNull T t) {
            logger.atInfo().log("actor {} received message {} from channel {}", this.actor.name(), t, this.channel.name());
            this.actor.receive(t);
            this.subscription.request(1L);
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onError(Throwable th) {
            logger.atError().withThrowable(th).log("actor {} has error on channel {}", this.actor.name(), this.channel.name());
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onComplete() {
            logger.atInfo().log("actor {} has completed on channel {}", this.actor.name(), this.channel.name());
        }
    }

    public Channel(@NotNull String str) {
        this.name = str;
    }

    public String name() {
        return this.name;
    }

    public SubmissionPublisher<T> publisher() {
        return this.publisher;
    }

    public void subscribe(@NotNull Actor<T> actor) {
        this.publisher.subscribe(new ActorSubscriber(actor, this));
    }

    public void publish(@NotNull T t) {
        this.publisher.submit(t);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.publisher.close();
    }
}
