package org.apache.qpid.protonj2.client.impl;

import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.qpid.protonj2.client.ErrorCondition;
import org.apache.qpid.protonj2.client.Receiver;
import org.apache.qpid.protonj2.client.Source;
import org.apache.qpid.protonj2.client.StreamDelivery;
import org.apache.qpid.protonj2.client.StreamReceiver;
import org.apache.qpid.protonj2.client.StreamReceiverOptions;
import org.apache.qpid.protonj2.client.Target;
import org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
import org.apache.qpid.protonj2.client.futures.ClientFuture;
import org.apache.qpid.protonj2.engine.Connection;
import org.apache.qpid.protonj2.engine.Engine;
import org.apache.qpid.protonj2.engine.EventHandler;
import org.apache.qpid.protonj2.engine.IncomingDelivery;
import org.apache.qpid.protonj2.types.messaging.Released;
import org.apache.qpid.protonj2.types.transport.DeliveryState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.class */
public final class ClientStreamReceiver implements StreamReceiver {
    private static final Logger LOG = LoggerFactory.getLogger(ClientReceiver.class);
    private static final AtomicIntegerFieldUpdater<ClientStreamReceiver> CLOSED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ClientStreamReceiver.class, "closed");
    private final ClientFuture<Receiver> openFuture;
    private final ClientFuture<Receiver> closeFuture;
    private ClientFuture<Receiver> drainingFuture;
    private ScheduledFuture<?> drainingTimeout;
    private final StreamReceiverOptions options;
    private final ClientSession session;
    private final ScheduledExecutorService executor;
    private final String receiverId;
    private final Map<ClientFuture<StreamDelivery>, ScheduledFuture<?>> receiveRequests = new LinkedHashMap();
    private org.apache.qpid.protonj2.engine.Receiver protonReceiver;
    private volatile int closed;
    private ClientException failureCause;
    private volatile Source remoteSource;
    private volatile Target remoteTarget;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClientStreamReceiver(ClientSession clientSession, StreamReceiverOptions streamReceiverOptions, String str, org.apache.qpid.protonj2.engine.Receiver receiver) {
        this.options = streamReceiverOptions;
        this.session = clientSession;
        this.receiverId = str;
        this.executor = clientSession.getScheduler();
        this.openFuture = clientSession.getFutureFactory().createFuture();
        this.closeFuture = clientSession.getFutureFactory().createFuture();
        this.protonReceiver = receiver.setLinkedResource(this);
        if (streamReceiverOptions.creditWindow() > 0) {
            this.protonReceiver.addCredit(streamReceiverOptions.creditWindow());
        }
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public ClientInstance client() {
        return this.session.client();
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public ClientConnection connection() {
        return this.session.connection();
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public ClientSession session() {
        return this.session;
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public ClientFuture<Receiver> openFuture() {
        return this.openFuture;
    }

    @Override // org.apache.qpid.protonj2.client.Receiver, java.lang.AutoCloseable
    public void close() {
        try {
            doCloseOrDetach(true, null).get();
        } catch (InterruptedException | ExecutionException e) {
            Thread.interrupted();
        }
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public void close(ErrorCondition errorCondition) {
        Objects.requireNonNull(errorCondition, "Error Condition cannot be null");
        try {
            doCloseOrDetach(true, errorCondition).get();
        } catch (InterruptedException | ExecutionException e) {
            Thread.interrupted();
        }
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public void detach() {
        try {
            doCloseOrDetach(false, null).get();
        } catch (InterruptedException | ExecutionException e) {
            Thread.interrupted();
        }
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public void detach(ErrorCondition errorCondition) {
        Objects.requireNonNull(errorCondition, "Error Condition cannot be null");
        try {
            doCloseOrDetach(false, errorCondition).get();
        } catch (InterruptedException | ExecutionException e) {
            Thread.interrupted();
        }
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public ClientFuture<Receiver> closeAsync() {
        return doCloseOrDetach(true, null);
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public ClientFuture<Receiver> closeAsync(ErrorCondition errorCondition) {
        Objects.requireNonNull(errorCondition, "Error Condition cannot be null");
        return doCloseOrDetach(true, errorCondition);
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public ClientFuture<Receiver> detachAsync() {
        return doCloseOrDetach(false, null);
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public ClientFuture<Receiver> detachAsync(ErrorCondition errorCondition) {
        Objects.requireNonNull(errorCondition, "The provided Error Condition cannot be null");
        return doCloseOrDetach(false, errorCondition);
    }

    private ClientFuture<Receiver> doCloseOrDetach(boolean z, ErrorCondition errorCondition) {
        if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) {
            this.executor.execute(() -> {
                if (this.protonReceiver.isLocallyOpen()) {
                    try {
                        this.protonReceiver.setCondition(ClientErrorCondition.asProtonErrorCondition(errorCondition));
                        if (z) {
                            this.protonReceiver.close();
                        } else {
                            this.protonReceiver.detach();
                        }
                    } catch (Throwable th) {
                        this.closeFuture.complete(this);
                    }
                }
            });
        }
        return this.closeFuture;
    }

    @Override // org.apache.qpid.protonj2.client.StreamReceiver, org.apache.qpid.protonj2.client.Receiver
    public StreamDelivery receive() throws ClientException {
        return receive(-1L, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.qpid.protonj2.client.StreamReceiver, org.apache.qpid.protonj2.client.Receiver
    public StreamDelivery receive(long j, TimeUnit timeUnit) throws ClientException {
        checkClosedOrFailed();
        ClientFuture createFuture = this.session.getFutureFactory().createFuture();
        this.executor.execute(() -> {
            if (notClosedOrFailed(createFuture)) {
                IncomingDelivery incomingDelivery = null;
                Iterator it = this.protonReceiver.unsettled().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    IncomingDelivery incomingDelivery2 = (IncomingDelivery) it.next();
                    if (incomingDelivery2.getLinkedResource() == null) {
                        incomingDelivery = incomingDelivery2;
                        break;
                    }
                }
                if (incomingDelivery != null) {
                    createFuture.complete(new ClientStreamDelivery(this, incomingDelivery));
                } else if (j == 0) {
                    createFuture.complete(null);
                } else {
                    this.receiveRequests.put(createFuture, j > 0 ? this.session.getScheduler().schedule(() -> {
                        this.receiveRequests.remove(createFuture);
                        createFuture.complete(null);
                    }, j, timeUnit) : null);
                }
            }
        });
        return (StreamDelivery) this.session.request(this, createFuture);
    }

    @Override // org.apache.qpid.protonj2.client.StreamReceiver, org.apache.qpid.protonj2.client.Receiver
    public StreamDelivery tryReceive() throws ClientException {
        checkClosedOrFailed();
        return receive(0L, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.qpid.protonj2.client.StreamReceiver, org.apache.qpid.protonj2.client.Receiver
    public StreamReceiver addCredit(int i) throws ClientException {
        checkClosedOrFailed();
        ClientFuture createFuture = this.session.getFutureFactory().createFuture();
        this.executor.execute(() -> {
            if (notClosedOrFailed(createFuture)) {
                if (this.options.creditWindow() != 0) {
                    createFuture.failed(new ClientIllegalStateException("Cannot add credit when a credit window has been configured"));
                    return;
                }
                if (this.protonReceiver.isDraining()) {
                    createFuture.failed(new ClientIllegalStateException("Cannot add credit while a drain is pending"));
                    return;
                }
                try {
                    this.protonReceiver.addCredit(i);
                    createFuture.complete(this);
                } catch (Exception e) {
                    createFuture.failed(ClientExceptionSupport.createNonFatalOrPassthrough(e));
                }
            }
        });
        return (StreamReceiver) this.session.request(this, createFuture);
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public Future<Receiver> drain() throws ClientException {
        checkClosedOrFailed();
        ClientFuture createFuture = this.session.getFutureFactory().createFuture();
        this.executor.execute(() -> {
            if (notClosedOrFailed(createFuture)) {
                if (this.protonReceiver.isDraining()) {
                    createFuture.failed(new ClientIllegalStateException("StreamReceiver is already draining"));
                    return;
                }
                try {
                    if (this.protonReceiver.drain()) {
                        this.drainingFuture = createFuture;
                        this.drainingTimeout = this.session.scheduleRequestTimeout(this.drainingFuture, this.options.drainTimeout(), () -> {
                            return new ClientOperationTimedOutException("Timed out waiting for remote to respond to drain request");
                        });
                    } else {
                        createFuture.complete(this);
                    }
                } catch (Exception e) {
                    createFuture.failed(ClientExceptionSupport.createNonFatalOrPassthrough(e));
                }
            }
        });
        return createFuture;
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public Map<String, Object> properties() throws ClientException {
        waitForOpenToComplete();
        return ClientConversionSupport.toStringKeyedMap(this.protonReceiver.getRemoteProperties());
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public String[] offeredCapabilities() throws ClientException {
        waitForOpenToComplete();
        return ClientConversionSupport.toStringArray(this.protonReceiver.getRemoteOfferedCapabilities());
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public String[] desiredCapabilities() throws ClientException {
        waitForOpenToComplete();
        return ClientConversionSupport.toStringArray(this.protonReceiver.getRemoteDesiredCapabilities());
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public String address() throws ClientException {
        if (isDynamic()) {
            waitForOpenToComplete();
            return this.protonReceiver.getRemoteSource().getAddress();
        }
        if (this.protonReceiver.getSource() != null) {
            return this.protonReceiver.getSource().getAddress();
        }
        return null;
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public Source source() throws ClientException {
        waitForOpenToComplete();
        return this.remoteSource;
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public Target target() throws ClientException {
        waitForOpenToComplete();
        return this.remoteTarget;
    }

    @Override // org.apache.qpid.protonj2.client.Receiver
    public long queuedDeliveries() throws ClientException {
        checkClosedOrFailed();
        ClientFuture createFuture = this.session.getFutureFactory().createFuture();
        this.executor.execute(() -> {
            if (notClosedOrFailed(createFuture)) {
                int i = 0;
                Iterator it = this.protonReceiver.unsettled().iterator();
                while (it.hasNext()) {
                    if (((IncomingDelivery) it.next()).getLinkedResource() == null) {
                        i++;
                    }
                }
                createFuture.complete(Integer.valueOf(i));
            }
        });
        return ((Integer) this.session.request(this, createFuture)).intValue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClientStreamReceiver open() {
        this.protonReceiver.localOpenHandler(this::handleLocalOpen).localCloseHandler(this::handleLocalCloseOrDetach).localDetachHandler(this::handleLocalCloseOrDetach).openHandler(this::handleRemoteOpen).closeHandler(this::handleRemoteCloseOrDetach).detachHandler(this::handleRemoteCloseOrDetach).parentEndpointClosedHandler(this::handleParentEndpointClosed).deliveryStateUpdatedHandler(this::handleDeliveryStateRemotelyUpdated).deliveryReadHandler(this::handleDeliveryRead).deliveryAbortedHandler(this::handleDeliveryAborted).creditStateUpdateHandler(this::handleReceiverCreditUpdated).engineShutdownHandler(this::handleEngineShutdown).open();
        return this;
    }

    void setFailureCause(ClientException clientException) {
        this.failureCause = clientException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClientException getFailureCause() {
        return this.failureCause == null ? this.session.getFailureCause() : this.failureCause;
    }

    String getId() {
        return this.receiverId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isClosed() {
        return this.closed > 0;
    }

    boolean isDynamic() {
        return this.protonReceiver.getSource() != null && this.protonReceiver.getSource().isDynamic();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamReceiverOptions receiverOptions() {
        return this.options;
    }

    private void handleLocalOpen(org.apache.qpid.protonj2.engine.Receiver receiver) {
        if (this.options.openTimeout() > 0) {
            this.executor.schedule(() -> {
                if (this.openFuture.isDone()) {
                    return;
                }
                immediateLinkShutdown(new ClientOperationTimedOutException("Receiver open timed out waiting for remote to respond"));
            }, this.options.openTimeout(), TimeUnit.MILLISECONDS);
        }
    }

    private void handleLocalCloseOrDetach(org.apache.qpid.protonj2.engine.Receiver receiver) {
        if (receiver.getEngine().isShutdown() || this.failureCause != null || !receiver.isRemotelyOpen()) {
            immediateLinkShutdown(this.failureCause);
            return;
        }
        long closeTimeout = this.options.closeTimeout();
        if (closeTimeout > 0) {
            this.session.scheduleRequestTimeout(this.closeFuture, closeTimeout, () -> {
                return new ClientOperationTimedOutException("receiver close timed out waiting for remote to respond");
            });
        }
    }

    private void handleRemoteOpen(org.apache.qpid.protonj2.engine.Receiver receiver) {
        if (receiver.getRemoteSource() == null) {
            LOG.debug("Receiver opened but remote signalled close is pending: {}", this.receiverId);
            return;
        }
        this.remoteSource = new ClientRemoteSource(receiver.getRemoteSource());
        if (receiver.getRemoteTarget() != null) {
            this.remoteTarget = new ClientRemoteTarget(receiver.getRemoteTarget());
        }
        replenishCreditIfNeeded();
        this.openFuture.complete(this);
        LOG.trace("Receiver opened successfully: {}", this.receiverId);
    }

    private void handleRemoteCloseOrDetach(org.apache.qpid.protonj2.engine.Receiver receiver) {
        if (receiver.isLocallyOpen()) {
            immediateLinkShutdown(ClientExceptionSupport.convertToLinkClosedException(receiver.getRemoteCondition(), "Receiver remotely closed without explanation from the remote"));
        } else {
            immediateLinkShutdown(this.failureCause);
        }
    }

    private void handleParentEndpointClosed(org.apache.qpid.protonj2.engine.Receiver receiver) {
        if (receiver.getEngine().isRunning()) {
            immediateLinkShutdown(receiver.getConnection().getRemoteCondition() != null ? ClientExceptionSupport.convertToConnectionClosedException(receiver.getConnection().getRemoteCondition()) : receiver.getSession().getRemoteCondition() != null ? ClientExceptionSupport.convertToSessionClosedException(receiver.getSession().getRemoteCondition()) : receiver.getEngine().failureCause() != null ? ClientExceptionSupport.convertToConnectionClosedException(receiver.getEngine().failureCause()) : !isClosed() ? new ClientResourceRemotelyClosedException("Remote closed without a specific error condition") : null);
        }
    }

    private void handleEngineShutdown(Engine engine) {
        if (isDynamic() || this.session.getConnection().getEngine().isShutdown()) {
            Connection connection = engine.connection();
            immediateLinkShutdown(connection.getRemoteCondition() != null ? ClientExceptionSupport.convertToConnectionClosedException(connection.getRemoteCondition()) : engine.failureCause() != null ? ClientExceptionSupport.convertToConnectionClosedException(engine.failureCause()) : !isClosed() ? new ClientConnectionRemotelyClosedException("Remote closed without a specific error condition") : null);
            return;
        }
        int credit = this.protonReceiver.getCredit() + this.protonReceiver.unsettled().size();
        if (this.drainingFuture != null) {
            this.drainingFuture.complete(this);
            if (this.drainingTimeout != null) {
                this.drainingTimeout.cancel(false);
                this.drainingTimeout = null;
            }
        }
        this.protonReceiver.localCloseHandler((EventHandler) null);
        this.protonReceiver.localDetachHandler((EventHandler) null);
        this.protonReceiver.close();
        this.protonReceiver = ClientReceiverBuilder.recreateReceiver(this.session, this.protonReceiver, this.options);
        this.protonReceiver.setLinkedResource(this);
        this.protonReceiver.addCredit(credit);
        open();
    }

    private void handleDeliveryRead(IncomingDelivery incomingDelivery) {
        LOG.trace("Delivery data was received: {}", incomingDelivery);
        if (incomingDelivery.getDefaultDeliveryState() == null) {
            incomingDelivery.setDefaultDeliveryState(Released.getInstance());
        }
        if (incomingDelivery.getLinkedResource() != null || this.receiveRequests.isEmpty()) {
            return;
        }
        Iterator<Map.Entry<ClientFuture<StreamDelivery>, ScheduledFuture<?>>> it = this.receiveRequests.entrySet().iterator();
        Map.Entry<ClientFuture<StreamDelivery>, ScheduledFuture<?>> next = it.next();
        if (next.getValue() != null) {
            next.getValue().cancel(false);
        }
        try {
            next.getKey().complete(new ClientStreamDelivery(this, incomingDelivery));
            it.remove();
        } catch (Throwable th) {
            it.remove();
            throw th;
        }
    }

    private void handleDeliveryAborted(IncomingDelivery incomingDelivery) {
        LOG.trace("Delivery data was aborted: {}", incomingDelivery);
        incomingDelivery.settle();
        replenishCreditIfNeeded();
    }

    private void handleDeliveryStateRemotelyUpdated(IncomingDelivery incomingDelivery) {
        LOG.trace("Delivery remote state was updated: {}", incomingDelivery);
    }

    private void handleReceiverCreditUpdated(org.apache.qpid.protonj2.engine.Receiver receiver) {
        LOG.trace("Receiver credit update by remote: {}", receiver);
        if (this.drainingFuture == null || receiver.getCredit() != 0) {
            return;
        }
        this.drainingFuture.complete(this);
        if (this.drainingTimeout != null) {
            this.drainingTimeout.cancel(false);
            this.drainingTimeout = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void disposition(IncomingDelivery incomingDelivery, DeliveryState deliveryState, boolean z) throws ClientException {
        checkClosedOrFailed();
        asyncApplyDisposition(incomingDelivery, deliveryState, z);
    }

    private void asyncApplyDisposition(IncomingDelivery incomingDelivery, DeliveryState deliveryState, boolean z) throws ClientException {
        this.executor.execute(() -> {
            this.session.getTransactionContext().disposition(incomingDelivery, deliveryState, z);
            replenishCreditIfNeeded();
        });
    }

    private void replenishCreditIfNeeded() {
        int creditWindow = this.options.creditWindow();
        if (creditWindow > 0) {
            int credit = this.protonReceiver.getCredit();
            if (credit <= creditWindow * 0.5d) {
                int size = credit + this.protonReceiver.unsettled().size();
                if (size <= creditWindow * 0.7d) {
                    int i = creditWindow - size;
                    LOG.trace("Consumer granting additional credit: {}", Integer.valueOf(i));
                    try {
                        this.protonReceiver.addCredit(i);
                    } catch (Exception e) {
                        LOG.debug("Error caught during credit top-up", e);
                    }
                }
            }
        }
    }

    private void waitForOpenToComplete() throws ClientException {
        if (!this.openFuture.isComplete() || this.openFuture.isFailed()) {
            try {
                this.openFuture.get();
            } catch (InterruptedException | ExecutionException e) {
                Thread.interrupted();
                if (this.failureCause == null) {
                    throw ClientExceptionSupport.createNonFatalOrPassthrough(e.getCause());
                }
                throw this.failureCause;
            }
        }
    }

    private boolean notClosedOrFailed(ClientFuture<?> clientFuture) {
        if (isClosed()) {
            clientFuture.failed(new ClientIllegalStateException("The Receiver was explicity closed", this.failureCause));
            return false;
        }
        if (this.failureCause == null) {
            return true;
        }
        clientFuture.failed(this.failureCause);
        return false;
    }

    protected void checkClosedOrFailed() throws ClientException {
        if (isClosed()) {
            throw new ClientIllegalStateException("The Receiver was explicity closed", this.failureCause);
        }
        if (this.failureCause != null) {
            throw this.failureCause;
        }
    }

    private void immediateLinkShutdown(ClientException clientException) {
        if (this.failureCause == null) {
            this.failureCause = clientException;
        }
        try {
            if (this.protonReceiver.isRemotelyDetached()) {
                this.protonReceiver.detach();
            } else {
                this.protonReceiver.close();
            }
        } catch (Exception e) {
        } finally {
            this.session.closeAsync();
        }
        this.receiveRequests.forEach((clientFuture, scheduledFuture) -> {
            if (scheduledFuture != null) {
                scheduledFuture.cancel(false);
            }
            if (clientException != null) {
                clientFuture.failed(clientException);
            } else {
                clientFuture.failed(new ClientResourceRemotelyClosedException("The Stream Receiver has closed"));
            }
        });
        this.protonReceiver.unsettled().forEach(incomingDelivery -> {
            if (incomingDelivery.getLinkedResource() != null) {
                try {
                    ((ClientStreamDelivery) incomingDelivery.getLinkedResource(ClientStreamDelivery.class)).handleReceiverClosed(this);
                } catch (Exception e2) {
                }
            }
        });
        if (clientException != null) {
            this.openFuture.failed(clientException);
            if (this.drainingFuture != null) {
                this.drainingFuture.failed(clientException);
            }
        } else {
            this.openFuture.complete(this);
            if (this.drainingFuture != null) {
                this.drainingFuture.failed(new ClientResourceRemotelyClosedException("The Receiver has been closed"));
            }
        }
        if (this.drainingTimeout != null) {
            this.drainingTimeout.cancel(false);
            this.drainingTimeout = null;
        }
        this.closeFuture.complete(this);
    }
}
