/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.broker.event.processor;

import io.zeebe.broker.logstreams.processor.MetadataFilter;
import io.zeebe.broker.logstreams.processor.NoopSnapshotSupport;
import io.zeebe.broker.transport.clientapi.SubscribedEventWriter;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamReader;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.processor.EventProcessor;
import io.zeebe.logstreams.processor.StreamProcessor;
import io.zeebe.logstreams.processor.StreamProcessorContext;
import io.zeebe.logstreams.spi.SnapshotSupport;
import io.zeebe.protocol.clientapi.EventType;
import io.zeebe.protocol.clientapi.SubscriptionType;
import io.zeebe.protocol.impl.BrokerEventMetadata;
import io.zeebe.util.buffer.BufferReader;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.collection.LongRingBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.agrona.DirectBuffer;

public class TopicSubscriptionPushProcessor
implements StreamProcessor,
EventProcessor {
    protected final BrokerEventMetadata metadata = new BrokerEventMetadata();
    protected LoggedEvent event;
    protected final int clientStreamId;
    protected final long subscriberKey;
    protected long startPosition;
    protected final DirectBuffer name;
    protected final String nameString;
    protected DirectBuffer logStreamTopicName;
    protected int logStreamPartitionId;
    protected final SnapshotSupport snapshotSupport = new NoopSnapshotSupport();
    protected final SubscribedEventWriter channelWriter;
    protected LongRingBuffer pendingEvents;
    protected LongRingBuffer pendingAcks;
    protected AtomicBoolean enabled;

    public TopicSubscriptionPushProcessor(int clientStreamId, long subscriberKey, long startPosition, DirectBuffer name, int prefetchCapacity, SubscribedEventWriter channelWriter) {
        this.channelWriter = channelWriter;
        this.clientStreamId = clientStreamId;
        this.subscriberKey = subscriberKey;
        this.startPosition = startPosition;
        this.name = BufferUtil.cloneBuffer((DirectBuffer)name);
        this.nameString = name.getStringWithoutLengthUtf8(0, name.capacity());
        this.enabled = new AtomicBoolean(false);
        if (prefetchCapacity > 0) {
            this.pendingEvents = new LongRingBuffer(prefetchCapacity);
            this.pendingAcks = new LongRingBuffer(prefetchCapacity);
        }
    }

    public void onOpen(StreamProcessorContext context) {
        LogStreamReader logReader = context.getSourceLogStreamReader();
        LogStream sourceStream = context.getSourceStream();
        this.logStreamTopicName = sourceStream.getTopicName();
        this.logStreamPartitionId = sourceStream.getPartitionId();
        this.setToStartPosition(logReader);
    }

    public long getStartPosition() {
        return this.startPosition;
    }

    protected void setToStartPosition(LogStreamReader logReader) {
        if (this.startPosition >= 0L) {
            logReader.seek(this.startPosition);
        } else {
            logReader.seekToLastEvent();
            if (logReader.hasNext()) {
                logReader.next();
            }
        }
        this.startPosition = logReader.getPosition();
    }

    public SnapshotSupport getStateResource() {
        return this.snapshotSupport;
    }

    public EventProcessor onEvent(LoggedEvent event) {
        this.event = event;
        return this;
    }

    public void processEvent() {
    }

    public boolean executeSideEffects() {
        boolean elementAdded;
        this.event.readMetadata((BufferReader)this.metadata);
        boolean success = this.channelWriter.topicName(this.logStreamTopicName).partitionId(this.logStreamPartitionId).eventType(this.metadata.getEventType()).key(this.event.getKey()).position(this.event.getPosition()).subscriberKey(this.subscriberKey).subscriptionType(SubscriptionType.TOPIC_SUBSCRIPTION).event(this.event.getValueBuffer(), this.event.getValueOffset(), this.event.getValueLength()).tryWriteMessage(this.clientStreamId);
        if (success && this.recordsPendingEvents() && !(elementAdded = this.pendingEvents.addElementToHead(this.event.getPosition()))) {
            throw new RuntimeException("Cannot record pending event " + elementAdded);
        }
        return success;
    }

    public boolean isSuspended() {
        if (!this.enabled.get()) {
            return true;
        }
        if (this.recordsPendingEvents()) {
            this.pendingAcks.consume(ackedPosition -> this.pendingEvents.consumeAscendingUntilInclusive(ackedPosition));
            return this.pendingEvents.isSaturated();
        }
        return false;
    }

    public int getChannelId() {
        return this.clientStreamId;
    }

    public SubscribedEventWriter getChannelWriter() {
        return this.channelWriter;
    }

    public String getNameAsString() {
        return this.nameString;
    }

    public void onAck(long eventPosition) {
        boolean elementAdded;
        if (this.recordsPendingEvents() && !(elementAdded = this.pendingAcks.addElementToHead(eventPosition))) {
            throw new RuntimeException("Could not acknowledge event at position " + eventPosition + "; ACK capacity saturated");
        }
    }

    protected boolean recordsPendingEvents() {
        return this.pendingEvents != null;
    }

    public static MetadataFilter eventFilter() {
        return m -> {
            EventType eventType = m.getEventType();
            return eventType != EventType.SUBSCRIPTION_EVENT && eventType != EventType.SUBSCRIBER_EVENT && eventType != EventType.NOOP_EVENT;
        };
    }

    public DirectBuffer getName() {
        return this.name;
    }

    public long getSubscriptionId() {
        return this.subscriberKey;
    }

    public void enable() {
        this.enabled.set(true);
    }
}

