package org.reaktivity.nukleus.tcp.internal.bench;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.agrona.IoUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.ringbuffer.RingBufferDescriptor;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Group;
import org.openjdk.jmh.annotations.GroupThreads;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Control;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.reaktivity.nukleus.Configuration;
import org.reaktivity.nukleus.tcp.internal.TcpController;
import org.reaktivity.nukleus.tcp.internal.TcpStreams;
import org.reaktivity.nukleus.tcp.internal.types.control.Role;
import org.reaktivity.nukleus.tcp.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.DataFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.WindowFW;
import org.reaktivity.reaktor.internal.Reaktor;

@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@State(Scope.Benchmark)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(3)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode({Mode.Throughput})
/* loaded from: input_file:org/reaktivity/nukleus/tcp/internal/bench/TcpServerBM.class */
public class TcpServerBM {
    private final Configuration configuration;
    private final Reaktor reaktor;
    private final BeginFW beginRO;
    private final DataFW dataRO;
    private final WindowFW.Builder windowRW;
    private TcpStreams streams;
    private ByteBuffer sendByteBuffer;
    private AtomicBuffer throttleBuffer;
    private final Random random;
    private final long targetRef;

    public TcpServerBM() {
        Properties properties = new Properties();
        properties.setProperty("nuklei.directory", "target/nukleus-benchmarks");
        properties.setProperty("nuklei.streams.buffer.capacity", Long.toString(16777216L));
        this.configuration = new Configuration(properties);
        Configuration configuration = this.configuration;
        Predicate predicate = str -> {
            return "tcp".equals(str);
        };
        Class<TcpController> cls = TcpController.class;
        TcpController.class.getClass();
        this.reaktor = Reaktor.launch(configuration, predicate, cls::isAssignableFrom);
        this.beginRO = new BeginFW();
        this.dataRO = new DataFW();
        this.windowRW = new WindowFW.Builder();
        this.random = new Random();
        this.targetRef = this.random.nextLong();
    }

    @Setup(Level.Trial)
    public void reinit() throws Exception {
        byte[] bytes = "Hello, world".getBytes(StandardCharsets.UTF_8);
        this.sendByteBuffer = ByteBuffer.allocateDirect(bytes.length).order(ByteOrder.nativeOrder()).put(bytes);
        this.throttleBuffer = new UnsafeBuffer(ByteBuffer.allocateDirect(12));
        IoUtil.createEmptyFile(this.configuration.directory().resolve("tcp/streams/target").toFile().getAbsoluteFile(), this.configuration.streamsBufferCapacity() + RingBufferDescriptor.TRAILER_LENGTH);
        TcpController controller = this.reaktor.controller(TcpController.class);
        controller.route(Role.INPUT, org.reaktivity.nukleus.tcp.internal.types.control.State.NEW, "any", 8080L, "target", this.targetRef, InetAddress.getByName("127.0.0.1")).get();
        this.streams = controller.streams("any", "target");
    }

    @TearDown(Level.Trial)
    public void reset() throws Exception {
        this.streams.close();
        this.streams = null;
        this.reaktor.controller(TcpController.class).unroute(Role.INPUT, org.reaktivity.nukleus.tcp.internal.types.control.State.NEW, "any", 8080L, "target", this.targetRef, InetAddress.getByName("127.0.0.1")).get();
    }

    @GroupThreads(1)
    @Benchmark
    @Group("throughput")
    public void writer(Control control) throws Exception {
        if (!control.startMeasurement || control.stopMeasurement) {
            return;
        }
        SocketChannel open = SocketChannel.open();
        Throwable th = null;
        try {
            try {
                open.connect(new InetSocketAddress("127.0.0.1", 8080));
                open.configureBlocking(false);
                while (control.startMeasurement && !control.stopMeasurement) {
                    this.sendByteBuffer.position(0);
                    if (open.write(this.sendByteBuffer) == 0) {
                        Thread.yield();
                    }
                }
                if (open != null) {
                    if (0 == 0) {
                        open.close();
                        return;
                    }
                    try {
                        open.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (open != null) {
                if (th != null) {
                    try {
                        open.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    open.close();
                }
            }
            throw th4;
        }
    }

    @GroupThreads(1)
    @Benchmark
    @Group("throughput")
    public void reader(Control control) throws Exception {
        MessageHandler messageHandler = this::handleRead;
        while (!control.stopMeasurement && this.streams.readStreams(messageHandler) == 0) {
            Thread.yield();
        }
    }

    private void handleRead(int i, MutableDirectBuffer mutableDirectBuffer, int i2, int i3) {
        if (i == 1) {
            this.beginRO.wrap(mutableDirectBuffer, i2, i2 + i3);
            doWindow(this.beginRO.streamId(), 8192);
        } else if (i == 2) {
            this.dataRO.wrap(mutableDirectBuffer, i2, i2 + i3);
            doWindow(this.dataRO.streamId(), this.dataRO.payload().length());
        }
    }

    private void doWindow(long j, int i) {
        WindowFW build = this.windowRW.wrap(this.throttleBuffer, 0, this.throttleBuffer.capacity()).streamId(j).update(i).build();
        this.streams.writeThrottle(build.typeId(), build.buffer(), build.offset(), build.length());
    }

    public static void main(String[] strArr) throws RunnerException {
        new Runner(new OptionsBuilder().include(TcpServerBM.class.getSimpleName()).forks(0).build()).run();
    }
}
