package io.deephaven.server.table;

import com.google.rpc.Code;
import io.deephaven.engine.rowset.TrackingRowSet;
import io.deephaven.engine.table.TableListener;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.BaseTable;
import io.deephaven.engine.table.impl.InstrumentedTableUpdateListener;
import io.deephaven.engine.table.impl.NotificationStepReceiver;
import io.deephaven.engine.table.impl.SwapListener;
import io.deephaven.engine.table.impl.UncoalescedTable;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.hash.KeyedLongObjectHashMap;
import io.deephaven.hash.KeyedLongObjectKey;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.proto.backplane.grpc.ExportNotification;
import io.deephaven.proto.backplane.grpc.ExportedTableUpdateMessage;
import io.deephaven.proto.backplane.grpc.Ticket;
import io.deephaven.proto.util.Exceptions;
import io.deephaven.proto.util.ExportTicketHelper;
import io.deephaven.server.session.SessionState;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import org.apache.commons.lang3.mutable.MutableLong;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/deephaven/server/table/ExportedTableUpdateListener.class */
public class ExportedTableUpdateListener implements StreamObserver<ExportNotification> {
    private final SessionState session;
    private final StreamObserver<ExportedTableUpdateMessage> responseObserver;
    private static final Logger log = LoggerFactory.getLogger(ExportedTableUpdateListener.class);
    private static final KeyedLongObjectKey<ListenerImpl> EXPORT_KEY = new KeyedLongObjectKey.BasicStrict<ListenerImpl>() { // from class: io.deephaven.server.table.ExportedTableUpdateListener.1
        public long getLongKey(@NotNull ListenerImpl listenerImpl) {
            return listenerImpl.exportId;
        }
    };
    private static final NotificationStepReceiver NOOP_NOTIFICATION_STEP_RECEIVER = j -> {
    };
    private final KeyedLongObjectHashMap<ListenerImpl> updateListenerMap = new KeyedLongObjectHashMap<>(EXPORT_KEY);
    private volatile boolean isDestroyed = false;
    private final String logPrefix = "ExportedTableUpdateListener(" + Integer.toHexString(System.identityHashCode(this)) + ") ";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/deephaven/server/table/ExportedTableUpdateListener$ListenerImpl.class */
    public class ListenerImpl extends InstrumentedTableUpdateListener {
        private final BaseTable table;
        private final int exportId;

        private ListenerImpl(BaseTable baseTable, int i) {
            super("ExportedTableUpdateListener (" + i + ")");
            this.table = baseTable;
            this.exportId = i;
            manage(baseTable);
        }

        public void onUpdate(TableUpdate tableUpdate) {
            ExportedTableUpdateListener.this.sendUpdateMessage(ExportTicketHelper.wrapExportIdInTicket(this.exportId), this.table.size(), null);
        }

        public void onFailureInternal(Throwable th, TableListener.Entry entry) {
            ExportedTableUpdateListener.this.sendUpdateMessage(ExportTicketHelper.wrapExportIdInTicket(this.exportId), this.table.size(), th);
        }

        public void destroy() {
            super.destroy();
            this.table.removeUpdateListener(this);
        }
    }

    public ExportedTableUpdateListener(SessionState sessionState, StreamObserver<ExportedTableUpdateMessage> streamObserver) {
        this.session = sessionState;
        this.responseObserver = streamObserver;
    }

    /* JADX WARN: Finally extract failed */
    public void onNext(ExportNotification exportNotification) {
        ListenerImpl listenerImpl;
        if (this.isDestroyed) {
            throw Exceptions.statusRuntimeException(Code.CANCELLED, "client cancelled the stream");
        }
        Ticket ticket = exportNotification.getTicket();
        int ticketToExportId = ExportTicketHelper.ticketToExportId(ticket, "ticket");
        try {
            ExportNotification.State exportState = exportNotification.getExportState();
            if (exportState == ExportNotification.State.EXPORTED) {
                SessionState.ExportObject export = this.session.getExport(ticket, "ticket");
                if (export.tryRetainReference()) {
                    try {
                        Object obj = export.get();
                        if (obj instanceof BaseTable) {
                            onNewTableExport(ticket, ticketToExportId, (BaseTable) obj);
                        }
                        export.dropReference();
                    } catch (Throwable th) {
                        export.dropReference();
                        throw th;
                    }
                }
            } else if (SessionState.isExportStateTerminal(exportState) && (listenerImpl = (ListenerImpl) this.updateListenerMap.remove(ticketToExportId)) != null) {
                listenerImpl.dropReference();
            }
        } catch (StatusRuntimeException e) {
        }
    }

    public void onError(Throwable th) {
        onCompleted();
    }

    public synchronized void onCompleted() {
        if (this.isDestroyed) {
            return;
        }
        this.isDestroyed = true;
        GrpcUtil.safelyComplete(this.responseObserver);
        this.updateListenerMap.forEach((v0) -> {
            v0.dropReference();
        });
        this.updateListenerMap.clear();
        log.info().append(this.logPrefix).append("is complete").endl();
    }

    private synchronized void onNewTableExport(Ticket ticket, int i, BaseTable baseTable) {
        if (baseTable instanceof UncoalescedTable) {
            return;
        }
        if (!baseTable.isRefreshing()) {
            sendUpdateMessage(ticket, baseTable.size(), null);
            return;
        }
        if (this.updateListenerMap.contains(Integer.valueOf(i))) {
            return;
        }
        SwapListener swapListener = new SwapListener(baseTable);
        swapListener.subscribeForUpdates();
        ListenerImpl listenerImpl = new ListenerImpl(baseTable, i);
        listenerImpl.tryRetainReference();
        this.updateListenerMap.put(i, listenerImpl);
        MutableLong mutableLong = new MutableLong();
        BaseTable.initializeWithSnapshot(this.logPrefix, swapListener, (z, j) -> {
            swapListener.setListenerAndResult(listenerImpl, NOOP_NOTIFICATION_STEP_RECEIVER);
            TrackingRowSet rowSet = baseTable.getRowSet();
            mutableLong.setValue(z ? rowSet.sizePrev() : rowSet.size());
            return true;
        });
        sendUpdateMessage(ticket, mutableLong.longValue(), null);
    }

    private synchronized void sendUpdateMessage(Ticket ticket, long j, Throwable th) {
        if (this.isDestroyed) {
            return;
        }
        ExportedTableUpdateMessage.Builder size = ExportedTableUpdateMessage.newBuilder().setExportId(ticket).setSize(j);
        if (th != null) {
            size.setUpdateFailureMessage(th.getMessage());
        }
        try {
            this.responseObserver.onNext(size.build());
        } catch (RuntimeException e) {
            log.debug().append(this.logPrefix).append("failed to notify listener of state change: ").append(e).endl();
            this.session.removeExportListener(this);
        }
    }
}
