package reactor.aeron;

import io.aeron.Publication;
import io.aeron.logbuffer.BufferClaim;
import java.time.Duration;
import java.util.Queue;
import java.util.function.Consumer;
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.MonoSink;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/aeron/MessagePublication.class */
public class MessagePublication implements OnDisposable {
    private static final Logger logger = LoggerFactory.getLogger(MessagePublication.class);
    private static final int QUEUE_CAPACITY = 8192;
    private final Publication publication;
    private final AeronEventLoop eventLoop;
    private final Duration connectTimeout;
    private final Duration backpressureTimeout;
    private final Duration adminActionTimeout;
    private final int writeLimit;
    private final Queue<PublishTask> publishTasks = new ManyToOneConcurrentArrayQueue(QUEUE_CAPACITY);
    private final MonoProcessor<Void> onDispose = MonoProcessor.create();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/aeron/MessagePublication$PublishTask.class */
    public static class PublishTask<B> {
        private static final ThreadLocal<BufferClaim> bufferClaims = ThreadLocal.withInitial(BufferClaim::new);
        private Publication publication;
        private B buffer;
        private DirectBufferHandler<B> bufferHandler;
        private MonoSink<Void> sink;
        private volatile boolean isDisposed;
        private long start;

        private PublishTask() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static <B> PublishTask<B> newInstance(Publication publication, B b, DirectBufferHandler<B> directBufferHandler, MonoSink<Void> monoSink) {
            PublishTask<B> publishTask = new PublishTask<>();
            ((PublishTask) publishTask).publication = publication;
            ((PublishTask) publishTask).buffer = b;
            ((PublishTask) publishTask).bufferHandler = directBufferHandler;
            ((PublishTask) publishTask).isDisposed = false;
            ((PublishTask) publishTask).start = 0L;
            ((PublishTask) publishTask).sink = monoSink;
            MonoSink<Void> monoSink2 = ((PublishTask) publishTask).sink;
            publishTask.getClass();
            monoSink2.onDispose(publishTask::onDispose);
            return publishTask;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long publish() {
            if (this.isDisposed) {
                return 1L;
            }
            if (this.start == 0) {
                this.start = System.currentTimeMillis();
            }
            int estimateLength = this.bufferHandler.estimateLength(this.buffer);
            if (estimateLength >= this.publication.maxPayloadLength()) {
                return this.publication.offer(this.bufferHandler.map(this.buffer, estimateLength));
            }
            BufferClaim bufferClaim = bufferClaims.get();
            long tryClaim = this.publication.tryClaim(estimateLength, bufferClaim);
            if (tryClaim > 0) {
                try {
                    this.bufferHandler.write(bufferClaim.buffer(), bufferClaim.offset(), this.buffer, estimateLength);
                    bufferClaim.commit();
                } catch (Exception e) {
                    bufferClaim.abort();
                    throw Exceptions.propagate(e);
                }
            }
            return tryClaim;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isTimeoutElapsed(Duration duration) {
            return System.currentTimeMillis() - this.start > duration.toMillis();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void success() {
            if (!this.isDisposed) {
                this.sink.success();
            }
            this.bufferHandler.dispose(this.buffer);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void error(Throwable th) {
            if (!this.isDisposed) {
                this.sink.error(th);
            }
            this.bufferHandler.dispose(this.buffer);
        }

        private void onDispose() {
            if (this.isDisposed) {
                return;
            }
            this.isDisposed = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessagePublication(Publication publication, AeronOptions aeronOptions, AeronEventLoop aeronEventLoop) {
        this.publication = publication;
        this.eventLoop = aeronEventLoop;
        this.connectTimeout = aeronOptions.connectTimeout();
        this.backpressureTimeout = aeronOptions.backpressureTimeout();
        this.adminActionTimeout = aeronOptions.adminActionTimeout();
        this.writeLimit = aeronOptions.resources().writeLimit();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <B> Mono<Void> publish(B b, DirectBufferHandler<? super B> directBufferHandler) {
        return Mono.create(monoSink -> {
            boolean z = false;
            if (!isDisposed()) {
                z = this.publishTasks.offer(PublishTask.newInstance(this.publication, b, directBufferHandler, monoSink));
            }
            if (z) {
                return;
            }
            monoSink.error(AeronExceptions.failWithPublicationUnavailable());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int proceed() {
        int proceed0;
        int i = 0;
        for (int i2 = 0; i2 < this.writeLimit && (proceed0 = proceed0()) >= 1; i2++) {
            i += proceed0;
        }
        return i;
    }

    private int proceed0() {
        Exception exc = null;
        PublishTask peek = this.publishTasks.peek();
        if (peek == null) {
            return 0;
        }
        long j = 0;
        try {
            j = peek.publish();
        } catch (Exception e) {
            exc = e;
        }
        if (j > 0) {
            this.publishTasks.poll();
            peek.success();
            return 1;
        }
        if (j == -4) {
            logger.warn("aeron.Publication is CLOSED: {}", this);
            dispose();
            return 0;
        }
        if (j == -5) {
            logger.warn("aeron.Publication received MAX_POSITION_EXCEEDED: {}", this);
            dispose();
            return 0;
        }
        if (j == -1 && peek.isTimeoutElapsed(this.connectTimeout)) {
            logger.warn("aeron.Publication failed to resolve NOT_CONNECTED within {} ms, {}", Long.valueOf(this.connectTimeout.toMillis()), this);
            exc = AeronExceptions.failWithPublication("Failed to resolve NOT_CONNECTED within timeout");
        }
        if (j == -2 && peek.isTimeoutElapsed(this.backpressureTimeout)) {
            logger.warn("aeron.Publication failed to resolve BACK_PRESSURED within {} ms, {}", Long.valueOf(this.backpressureTimeout.toMillis()), this);
            exc = AeronExceptions.failWithPublication("Failed to resolve BACK_PRESSURED within timeout");
        }
        if (j == -3 && peek.isTimeoutElapsed(this.adminActionTimeout)) {
            logger.warn("aeron.Publication failed to resolve ADMIN_ACTION within {} ms, {}", Long.valueOf(this.adminActionTimeout.toMillis()), this);
            exc = AeronExceptions.failWithPublication("Failed to resolve ADMIN_ACTION within timeout");
        }
        if (exc == null) {
            return 0;
        }
        this.publishTasks.poll();
        peek.error(exc);
        return 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        if (!this.eventLoop.inEventLoop()) {
            throw AeronExceptions.failWithResourceDisposal("aeron publication");
        }
        try {
            try {
                this.publication.close();
                logger.debug("Disposed {}", this);
                disposePublishTasks();
                this.onDispose.onComplete();
            } catch (Exception e) {
                logger.warn("{} failed on aeron.Publication close(): {}", this, e.toString());
                throw Exceptions.propagate(e);
            }
        } catch (Throwable th) {
            disposePublishTasks();
            this.onDispose.onComplete();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int sessionId() {
        return this.publication.sessionId();
    }

    public boolean isDisposed() {
        return this.publication.isClosed();
    }

    public void dispose() {
        this.eventLoop.disposePublication(this).subscribe((Consumer) null, th -> {
        });
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<MessagePublication> ensureConnected() {
        return Mono.defer(() -> {
            Duration ofMillis = Duration.ofMillis(100L);
            return ensureConnected0().retryBackoff(Math.max(this.connectTimeout.toMillis() / ofMillis.toMillis(), 1L), ofMillis, ofMillis).doOnError(th -> {
                logger.warn("aeron.Publication is not connected after several retries");
            }).thenReturn(this);
        });
    }

    private Mono<Void> ensureConnected0() {
        return Mono.defer(() -> {
            return this.publication.isConnected() ? Mono.empty() : Mono.error(AeronExceptions.failWithPublication("aeron.Publication is not connected"));
        });
    }

    private void disposePublishTasks() {
        while (true) {
            PublishTask poll = this.publishTasks.poll();
            if (poll == null) {
                return;
            } else {
                try {
                    poll.error(AeronExceptions.failWithCancel("PublishTask has cancelled"));
                } catch (Exception e) {
                }
            }
        }
    }

    public String toString() {
        return "MessagePublication{pub=" + this.publication.channel() + "}";
    }
}
