package org.springframework.integration.channel;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-core-5.1.3.RELEASE.jar:org/springframework/integration/channel/MessageChannelReactiveUtils.class */
public final class MessageChannelReactiveUtils {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-integration-core-5.1.3.RELEASE.jar:org/springframework/integration/channel/MessageChannelReactiveUtils$PollableChannelPublisherAdapter.class */
    public static final class PollableChannelPublisherAdapter<T> implements Publisher<Message<T>> {
        private final PollableChannel channel;

        PollableChannelPublisherAdapter(PollableChannel pollableChannel) {
            this.channel = pollableChannel;
        }

        @Override // org.reactivestreams.Publisher
        public void subscribe(Subscriber<? super Message<T>> subscriber) {
            Flux.create(fluxSink -> {
                fluxSink.onRequest(j -> {
                    Message<?> receive;
                    while (!fluxSink.isCancelled()) {
                        long j = j;
                        j = j - 1;
                        if (j <= 0 || (receive = this.channel.receive()) == null) {
                            return;
                        } else {
                            fluxSink.next(receive);
                        }
                    }
                });
            }, FluxSink.OverflowStrategy.IGNORE).subscribeOn(Schedulers.elastic()).subscribe(subscriber);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-integration-core-5.1.3.RELEASE.jar:org/springframework/integration/channel/MessageChannelReactiveUtils$SubscribableChannelPublisherAdapter.class */
    public static final class SubscribableChannelPublisherAdapter<T> implements Publisher<Message<T>> {
        private final SubscribableChannel channel;

        SubscribableChannelPublisherAdapter(SubscribableChannel subscribableChannel) {
            this.channel = subscribableChannel;
        }

        @Override // org.reactivestreams.Publisher
        public void subscribe(Subscriber<? super Message<T>> subscriber) {
            Flux.create(fluxSink -> {
                fluxSink.getClass();
                MessageHandler messageHandler = (v1) -> {
                    r0.next(v1);
                };
                this.channel.subscribe(messageHandler);
                fluxSink.onCancel(() -> {
                    this.channel.unsubscribe(messageHandler);
                });
            }, FluxSink.OverflowStrategy.IGNORE).subscribe(subscriber);
        }
    }

    private MessageChannelReactiveUtils() {
    }

    public static <T> Publisher<Message<T>> toPublisher(MessageChannel messageChannel) {
        if (messageChannel instanceof Publisher) {
            return (Publisher) messageChannel;
        }
        if (messageChannel instanceof SubscribableChannel) {
            return adaptSubscribableChannelToPublisher((SubscribableChannel) messageChannel);
        }
        if (messageChannel instanceof PollableChannel) {
            return adaptPollableChannelToPublisher((PollableChannel) messageChannel);
        }
        throw new IllegalArgumentException("The 'messageChannel' must be an instance of Publisher, SubscribableChannel or PollableChannel, not: " + messageChannel);
    }

    private static <T> Publisher<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel subscribableChannel) {
        return new SubscribableChannelPublisherAdapter(subscribableChannel);
    }

    private static <T> Publisher<Message<T>> adaptPollableChannelToPublisher(PollableChannel pollableChannel) {
        return new PollableChannelPublisherAdapter(pollableChannel);
    }
}
