package org.springframework.data.redis.stream;

import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.core.ReactiveStreamOperations;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.stream.StreamReceiver;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/spring-data-redis-2.4.1.jar:org/springframework/data/redis/stream/DefaultStreamReceiver.class */
public class DefaultStreamReceiver<K, V extends Record<K, ?>> implements StreamReceiver<K, V> {
    private final Log logger = LogFactory.getLog(getClass());
    private final ReactiveRedisTemplate<K, ?> template;
    private final ReactiveStreamOperations<K, Object, Object> streamOperations;
    private final StreamReadOptions readOptions;
    private final StreamReceiver.StreamReceiverOptions<K, V> receiverOptions;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/spring-data-redis-2.4.1.jar:org/springframework/data/redis/stream/DefaultStreamReceiver$PollState.class */
    public static class PollState {
        private final AtomicLong requestsPending = new AtomicLong();
        private final AtomicBoolean active = new AtomicBoolean(true);
        private final AtomicBoolean scheduled = new AtomicBoolean(false);
        private final ReadOffsetStrategy readOffsetStrategy;
        private final AtomicReference<ReadOffset> currentOffset;
        private final Optional<Consumer> consumer;

        private PollState(Optional<Consumer> optional, ReadOffsetStrategy readOffsetStrategy, ReadOffset readOffset) {
            this.readOffsetStrategy = readOffsetStrategy;
            this.currentOffset = new AtomicReference<>(readOffset);
            this.consumer = optional;
        }

        static PollState standalone(ReadOffset readOffset) {
            ReadOffsetStrategy strategy = ReadOffsetStrategy.getStrategy(readOffset);
            return new PollState(Optional.empty(), strategy, strategy.getFirst(readOffset, Optional.empty()));
        }

        static PollState consumer(Consumer consumer, ReadOffset readOffset) {
            ReadOffsetStrategy strategy = ReadOffsetStrategy.getStrategy(readOffset);
            Optional<Consumer> of = Optional.of(consumer);
            return new PollState(of, strategy, strategy.getFirst(readOffset, of));
        }

        public boolean isSubscriptionActive() {
            return this.active.get();
        }

        public void cancel() {
            this.active.set(false);
        }

        boolean decrementRequested() {
            long j = this.requestsPending.get();
            if (j > 0) {
                return this.requestsPending.compareAndSet(j, j - 1);
            }
            return false;
        }

        void incrementRequested() {
            this.requestsPending.incrementAndGet();
        }

        public long getRequested() {
            return this.requestsPending.get();
        }

        boolean setRequested(long j, long j2) {
            return this.requestsPending.compareAndSet(j, j2);
        }

        boolean activateSchedule() {
            return this.scheduled.compareAndSet(false, true);
        }

        public boolean isScheduled() {
            return this.scheduled.get();
        }

        void scheduleCompleted() {
            this.scheduled.compareAndSet(true, false);
        }

        void updateReadOffset(String str) {
            this.currentOffset.set(this.readOffsetStrategy.getNext(getCurrentReadOffset(), this.consumer, str));
        }

        ReadOffset getCurrentReadOffset() {
            return this.currentOffset.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/spring-data-redis-2.4.1.jar:org/springframework/data/redis/stream/DefaultStreamReceiver$StreamSubscription.class */
    public class StreamSubscription {
        private final Queue<V> overflow = (Queue) Queues.small().get();
        private final FluxSink<V> sink;
        private final K key;
        private final PollState pollState;
        private final BiFunction<K, ReadOffset, Flux<V>> readFunction;

        protected StreamSubscription(FluxSink<V> fluxSink, K k, PollState pollState, BiFunction<K, ReadOffset, Flux<V>> biFunction) {
            this.sink = fluxSink;
            this.key = k;
            this.pollState = pollState;
            this.readFunction = biFunction;
        }

        void arm() {
            this.sink.onRequest(j -> {
                long requested;
                long addCap;
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] onRequest(%d)", this.key, Long.valueOf(j)));
                }
                if (!this.pollState.isSubscriptionActive()) {
                    if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                        DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] onRequest(%d): Dropping, subscription canceled", this.key, Long.valueOf(j)));
                        return;
                    }
                    return;
                }
                do {
                    requested = this.pollState.getRequested();
                    if (requested == Long.MAX_VALUE) {
                        scheduleIfRequired();
                        return;
                    }
                    addCap = Operators.addCap(requested, j);
                } while (!this.pollState.setRequested(requested, addCap));
                if (addCap > 0) {
                    scheduleIfRequired();
                }
            });
            FluxSink<V> fluxSink = this.sink;
            PollState pollState = this.pollState;
            pollState.getClass();
            fluxSink.onCancel(pollState::cancel);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void scheduleIfRequired() {
            if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] scheduleIfRequired()", this.key));
            }
            if (this.pollState.isScheduled()) {
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] scheduleIfRequired(): Already scheduled", this.key));
                    return;
                }
                return;
            }
            if (!this.pollState.isSubscriptionActive()) {
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] scheduleIfRequired(): Subscription cancelled", this.key));
                    return;
                }
                return;
            }
            if (this.pollState.getRequested() > 0 && !this.overflow.isEmpty()) {
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.info(String.format("[stream: %s] scheduleIfRequired(): Requested: %d Emit from buffer", this.key, Long.valueOf(this.pollState.getRequested())));
                }
                emitBuffer();
            }
            if (this.pollState.getRequested() == 0) {
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] scheduleIfRequired(): Subscriber has no demand. Suspending subscription.", this.key));
                }
            } else if (this.pollState.getRequested() > 0 && this.pollState.activateSchedule()) {
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] scheduleIfRequired(): Activating subscription", this.key));
                }
                ReadOffset currentReadOffset = this.pollState.getCurrentReadOffset();
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] scheduleIfRequired(): Activating subscription, offset %s", this.key, currentReadOffset));
                }
                this.readFunction.apply(this.key, currentReadOffset).subscribe((CoreSubscriber<? super V>) getSubscriber());
            }
        }

        private CoreSubscriber<V> getSubscriber() {
            return (CoreSubscriber<V>) new CoreSubscriber<V>() { // from class: org.springframework.data.redis.stream.DefaultStreamReceiver.StreamSubscription.1
                @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
                public void onSubscribe(org.reactivestreams.Subscription subscription) {
                    subscription.request(Long.MAX_VALUE);
                }

                @Override // org.reactivestreams.Subscriber
                public void onNext(V v) {
                    StreamSubscription.this.onStreamMessage(v);
                }

                @Override // org.reactivestreams.Subscriber
                public void onError(Throwable th) {
                    StreamSubscription.this.onStreamError(th);
                }

                @Override // org.reactivestreams.Subscriber
                public void onComplete() {
                    if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                        DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] onComplete()", StreamSubscription.this.key));
                    }
                    StreamSubscription.this.pollState.scheduleCompleted();
                    StreamSubscription.this.scheduleIfRequired();
                }

                @Override // reactor.core.CoreSubscriber
                public Context currentContext() {
                    return StreamSubscription.this.sink.currentContext();
                }
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void onStreamMessage(V v) {
            if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] onStreamMessage(%s)", this.key, v));
            }
            this.pollState.updateReadOffset(v.getId().getValue());
            long requested = this.pollState.getRequested();
            if (requested <= 0) {
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] onStreamMessage(%s): Buffering overflow", this.key, v));
                }
                this.overflow.offer(v);
            } else if (requested == Long.MAX_VALUE) {
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] onStreamMessage(%s): Emitting item, fast-path", this.key, v));
                }
                this.sink.next(v);
            } else if (this.pollState.decrementRequested()) {
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] onStreamMessage(%s): Emitting item, slow-path", this.key, v));
                }
                this.sink.next(v);
            } else {
                if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                    DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] onStreamMessage(%s): Buffering overflow", this.key, v));
                }
                this.overflow.offer(v);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void onStreamError(Throwable th) {
            if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] onStreamError(%s)", this.key, th));
            }
            this.pollState.cancel();
            this.sink.error(th);
        }

        private void emitBuffer() {
            while (!this.overflow.isEmpty()) {
                long requested = this.pollState.getRequested();
                if (requested <= 0) {
                    return;
                }
                if (requested == Long.MAX_VALUE) {
                    V poll = this.overflow.poll();
                    if (poll == null) {
                        if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                            DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] emitBuffer(): emission missed", this.key));
                            return;
                        }
                        return;
                    } else {
                        if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                            DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] emitBuffer(%s): Emitting item from buffer, fast-path", this.key, poll));
                        }
                        this.sink.next(poll);
                    }
                } else if (this.pollState.setRequested(requested, requested - 1)) {
                    V poll2 = this.overflow.poll();
                    if (poll2 == null) {
                        if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                            DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] emitBuffer(): emission missed", this.key));
                        }
                        this.pollState.incrementRequested();
                        return;
                    } else {
                        if (DefaultStreamReceiver.this.logger.isDebugEnabled()) {
                            DefaultStreamReceiver.this.logger.debug(String.format("[stream: %s] emitBuffer(%s): Emitting item from buffer, slow-path", this.key, poll2));
                        }
                        this.sink.next(poll2);
                    }
                } else {
                    continue;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultStreamReceiver(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory, StreamReceiver.StreamReceiverOptions<K, V> streamReceiverOptions) {
        this.receiverOptions = streamReceiverOptions;
        RedisSerializationContext build = RedisSerializationContext.newSerializationContext((RedisSerializationContext.SerializationPair<?>) streamReceiverOptions.getKeySerializer()).key(streamReceiverOptions.getKeySerializer()).hashKey(streamReceiverOptions.getHashKeySerializer()).hashValue(streamReceiverOptions.getHashValueSerializer()).build();
        StreamReadOptions empty = StreamReadOptions.empty();
        empty = streamReceiverOptions.getBatchSize().isPresent() ? empty.count(streamReceiverOptions.getBatchSize().getAsInt()) : empty;
        this.readOptions = streamReceiverOptions.getPollTimeout().isZero() ? empty : empty.block(streamReceiverOptions.getPollTimeout());
        this.template = new ReactiveRedisTemplate<>(reactiveRedisConnectionFactory, build);
        if (streamReceiverOptions.getHashMapper() != null) {
            this.streamOperations = this.template.opsForStream(streamReceiverOptions.getHashMapper());
        } else {
            this.streamOperations = this.template.opsForStream();
        }
    }

    @Override // org.springframework.data.redis.stream.StreamReceiver
    public Flux<V> receive(StreamOffset<K> streamOffset) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug(String.format("receive(%s)", streamOffset));
        }
        BiFunction biFunction = this.receiverOptions.getHashMapper() != null ? (obj, readOffset) -> {
            return this.streamOperations.read(this.receiverOptions.getTargetType(), this.readOptions, StreamOffset.create(obj, readOffset));
        } : (obj2, readOffset2) -> {
            return this.streamOperations.read(this.readOptions, StreamOffset.create(obj2, readOffset2));
        };
        return Flux.defer(() -> {
            PollState standalone = PollState.standalone(streamOffset.getOffset());
            return Flux.create(fluxSink -> {
                new StreamSubscription(fluxSink, streamOffset.getKey(), standalone, biFunction).arm();
            });
        });
    }

    @Override // org.springframework.data.redis.stream.StreamReceiver
    public Flux<V> receiveAutoAck(Consumer consumer, StreamOffset<K> streamOffset) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug(String.format("receiveAutoAck(%s, %s)", consumer, streamOffset));
        }
        BiFunction<K, ReadOffset, Flux<? extends Record<?, ?>>> consumeReadFunction = getConsumeReadFunction(consumer, this.readOptions.autoAcknowledge());
        return Flux.defer(() -> {
            PollState consumer2 = PollState.consumer(consumer, streamOffset.getOffset());
            return Flux.create(fluxSink -> {
                new StreamSubscription(fluxSink, streamOffset.getKey(), consumer2, consumeReadFunction).arm();
            });
        });
    }

    @Override // org.springframework.data.redis.stream.StreamReceiver
    public Flux<V> receive(Consumer consumer, StreamOffset<K> streamOffset) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug(String.format("receive(%s, %s)", consumer, streamOffset));
        }
        BiFunction<K, ReadOffset, Flux<? extends Record<?, ?>>> consumeReadFunction = getConsumeReadFunction(consumer, this.readOptions);
        return Flux.defer(() -> {
            PollState consumer2 = PollState.consumer(consumer, streamOffset.getOffset());
            return Flux.create(fluxSink -> {
                new StreamSubscription(fluxSink, streamOffset.getKey(), consumer2, consumeReadFunction).arm();
            });
        });
    }

    private BiFunction<K, ReadOffset, Flux<? extends Record<?, ?>>> getConsumeReadFunction(Consumer consumer, StreamReadOptions streamReadOptions) {
        return this.receiverOptions.getHashMapper() != null ? (obj, readOffset) -> {
            return this.streamOperations.read(this.receiverOptions.getTargetType(), consumer, streamReadOptions, StreamOffset.create(obj, readOffset));
        } : (obj2, readOffset2) -> {
            return this.streamOperations.read(consumer, streamReadOptions, StreamOffset.create(obj2, readOffset2));
        };
    }
}
