/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.command.log.internal.spy;

import java.util.concurrent.atomic.AtomicLong;
import org.agrona.BitUtil;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.ringbuffer.RecordDescriptor;
import org.agrona.concurrent.ringbuffer.RingBufferDescriptor;
import org.reaktivity.command.log.internal.spy.RingBufferSpy;
import org.reaktivity.nukleus.function.MessagePredicate;

public class OneToOneRingBufferSpy
implements RingBufferSpy {
    private final int capacity;
    private final AtomicLong headPosition;
    private final AtomicBuffer buffer;

    public OneToOneRingBufferSpy(AtomicBuffer buffer) {
        this.buffer = buffer;
        RingBufferDescriptor.checkCapacity(buffer.capacity());
        this.capacity = buffer.capacity() - RingBufferDescriptor.TRAILER_LENGTH;
        buffer.verifyAlignment();
        this.headPosition = new AtomicLong();
    }

    public void resetHead() {
        this.headPosition.lazySet(this.buffer.getLong(this.capacity + RingBufferDescriptor.HEAD_POSITION_OFFSET));
    }

    @Override
    public DirectBuffer buffer() {
        return this.buffer;
    }

    @Override
    public long producerPosition() {
        return this.buffer.getLong(this.buffer.capacity() - RingBufferDescriptor.TRAILER_LENGTH + RingBufferDescriptor.TAIL_POSITION_OFFSET);
    }

    @Override
    public long consumerPosition() {
        return this.buffer.getLong(this.buffer.capacity() - RingBufferDescriptor.TRAILER_LENGTH + RingBufferDescriptor.HEAD_POSITION_OFFSET);
    }

    @Override
    public int spy(MessagePredicate handler) {
        return this.spy(handler, Integer.MAX_VALUE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int spy(MessagePredicate handler, int messageCountLimit) {
        int bytesRead;
        int messagesRead = 0;
        AtomicBuffer buffer = this.buffer;
        long head = this.headPosition.get();
        int capacity = this.capacity;
        int headIndex = (int)head & capacity - 1;
        int contiguousBlockLength = capacity - headIndex;
        try {
            int recordLength;
            for (bytesRead = 0; bytesRead < contiguousBlockLength && messagesRead < messageCountLimit; bytesRead += BitUtil.align(recordLength, 8)) {
                int recordIndex = headIndex + bytesRead;
                long header = buffer.getLongVolatile(recordIndex);
                recordLength = RecordDescriptor.recordLength(header);
                if (recordLength > 0) continue;
                break;
            }
        }
        finally {
            if (bytesRead != 0) {
                this.headPosition.lazySet(head + (long)bytesRead);
            }
        }
        return messagesRead;
    }
}

