/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.command.log.internal;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.function.LongPredicate;
import java.util.function.Predicate;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.collections.Long2LongHashMap;
import org.reaktivity.command.log.internal.Logger;
import org.reaktivity.command.log.internal.layouts.StreamsLayout;
import org.reaktivity.command.log.internal.spy.RingBufferSpy;
import org.reaktivity.command.log.internal.types.OctetsFW;
import org.reaktivity.command.log.internal.types.TcpAddressFW;
import org.reaktivity.command.log.internal.types.stream.AbortFW;
import org.reaktivity.command.log.internal.types.stream.BeginFW;
import org.reaktivity.command.log.internal.types.stream.DataFW;
import org.reaktivity.command.log.internal.types.stream.EndFW;
import org.reaktivity.command.log.internal.types.stream.FrameFW;
import org.reaktivity.command.log.internal.types.stream.HttpBeginExFW;
import org.reaktivity.command.log.internal.types.stream.ResetFW;
import org.reaktivity.command.log.internal.types.stream.TcpBeginExFW;
import org.reaktivity.command.log.internal.types.stream.WindowFW;

public final class LoggableStream
implements AutoCloseable {
    private final FrameFW frameRO = new FrameFW();
    private final BeginFW beginRO = new BeginFW();
    private final DataFW dataRO = new DataFW();
    private final EndFW endRO = new EndFW();
    private final AbortFW abortRO = new AbortFW();
    private final ResetFW resetRO = new ResetFW();
    private final WindowFW windowRO = new WindowFW();
    private final TcpBeginExFW tcpBeginExRO = new TcpBeginExFW();
    private final HttpBeginExFW httpBeginExRO = new HttpBeginExFW();
    private final String streamFormat;
    private final String throttleFormat;
    private final String targetName;
    private final StreamsLayout layout;
    private final RingBufferSpy streamsBuffer;
    private final Logger out;
    private final boolean verbose;
    private final Long2LongHashMap budgets;
    private final Long2LongHashMap timestamps;
    private final LongPredicate nextTimestamp;

    LoggableStream(String receiver, String sender, StreamsLayout layout, Logger logger, boolean verbose, Long2LongHashMap timestamps, LongPredicate nextTimestamp) {
        this.streamFormat = String.format("[%%d] [0x%%08x] [0x%%016x] [%s -> %s]\t[0x%%016x] [%%016x] %%s\n", sender, receiver);
        this.throttleFormat = String.format("[%%d] [0x%%08x] [0x%%016x] [%s <- %s]\t[0x%%016x] [%%016x] %%s\n", receiver, sender);
        this.layout = layout;
        this.streamsBuffer = layout.streamsBuffer();
        this.targetName = receiver;
        this.out = logger;
        this.verbose = verbose;
        this.budgets = new Long2LongHashMap(-1L);
        this.timestamps = timestamps;
        this.nextTimestamp = nextTimestamp;
    }

    int process() {
        return this.streamsBuffer.spy(this::handleFrame, 1);
    }

    @Override
    public void close() throws Exception {
        this.layout.close();
    }

    private boolean handleFrame(int msgTypeId, DirectBuffer buffer, int index, int length) {
        FrameFW frame = this.frameRO.wrap(buffer, index, index + length);
        long timestamp = frame.timestamp();
        if (!this.nextTimestamp.test(timestamp)) {
            return false;
        }
        switch (msgTypeId) {
            case 1: {
                BeginFW begin = this.beginRO.wrap(buffer, index, index + length);
                this.handleBegin(begin);
                break;
            }
            case 2: {
                DataFW data = this.dataRO.wrap(buffer, index, index + length);
                this.handleData(data);
                break;
            }
            case 3: {
                EndFW end = this.endRO.wrap(buffer, index, index + length);
                this.handleEnd(end);
                break;
            }
            case 4: {
                AbortFW abort = this.abortRO.wrap(buffer, index, index + length);
                this.handleAbort(abort);
                break;
            }
            case 0x40000001: {
                ResetFW reset = this.resetRO.wrap(buffer, index, index + length);
                this.handleReset(reset);
                break;
            }
            case 0x40000002: {
                WindowFW window = this.windowRO.wrap(buffer, index, index + length);
                this.handleWindow(window);
            }
        }
        return true;
    }

    private void handleBegin(BeginFW begin) {
        long timestamp = begin.timestamp();
        long streamId = begin.streamId();
        long traceId = begin.trace();
        String sourceName = begin.source().asString();
        long sourceRef = begin.sourceRef();
        long correlationId = begin.correlationId();
        long authorization = begin.authorization();
        int budget = (int)this.budgets.computeIfAbsent(streamId, id -> 0L);
        long initialId = streamId & Long.MAX_VALUE;
        long timeStart = this.timestamps.computeIfAbsent(initialId, id -> timestamp);
        long timeOffset = timeStart != -1L ? timestamp - timeStart : -1L;
        this.out.printf(this.streamFormat, timestamp, budget, traceId, streamId, timeOffset, String.format("BEGIN \"%s\" [0x%016x] [0x%016x] [0x%016x]", sourceName, sourceRef, correlationId, authorization));
        OctetsFW extension = begin.extension();
        if (this.verbose && extension.sizeof() != 0) {
            if (sourceName.equals("tcp") || this.targetName.equals("tcp")) {
                TcpBeginExFW tcpBeginEx = this.tcpBeginExRO.wrap(extension.buffer(), extension.offset(), extension.limit());
                InetSocketAddress localAddress = this.toInetSocketAddress(tcpBeginEx.localAddress(), tcpBeginEx.localPort());
                InetSocketAddress remoteAddress = this.toInetSocketAddress(tcpBeginEx.remoteAddress(), tcpBeginEx.remotePort());
                this.out.printf("[%d] %s\t%s\n", timestamp, localAddress, remoteAddress);
            }
            if (sourceName.startsWith("http")) {
                boolean isHttpProxyReply;
                boolean initial = sourceRef != 0L;
                long typedRef = sourceRef != 0L ? sourceRef : correlationId;
                Predicate<String> isHttp = n -> n.startsWith("http");
                LongPredicate isClient = r -> r > 0L && (r & 1L) != 0L;
                LongPredicate isServer = r -> r > 0L && (r & 1L) == 0L;
                LongPredicate isProxy = r -> r < 0L && (r & 1L) == 0L;
                boolean isHttpClientInitial = initial && isClient.test(typedRef) && isHttp.test(this.targetName);
                boolean isHttpClientReply = !initial && isClient.test(typedRef) && isHttp.test(sourceName);
                boolean isHttpServerInitial = initial && isServer.test(typedRef) && isHttp.test(sourceName);
                boolean isHttpServerReply = !initial && isServer.test(typedRef) && isHttp.test(this.targetName);
                boolean isHttpProxyInitial = initial && isProxy.test(typedRef) && (isHttp.test(sourceName) || isHttp.test(this.targetName));
                boolean bl = isHttpProxyReply = !initial && isProxy.test(typedRef) && (isHttp.test(sourceName) || isHttp.test(this.targetName));
                if (isHttpClientInitial || isHttpServerReply || isHttpClientReply || isHttpServerInitial || isHttpProxyInitial | isHttpProxyReply) {
                    HttpBeginExFW httpBeginEx = this.httpBeginExRO.wrap(extension.buffer(), extension.offset(), extension.limit());
                    httpBeginEx.headers().forEach(h -> this.out.printf("[%d] %s: %s\n", timestamp, h.name().asString(), h.value().asString()));
                }
            }
        }
    }

    private void handleData(DataFW data) {
        long timestamp = data.timestamp();
        long streamId = data.streamId();
        long traceId = data.trace();
        int length = data.length();
        int padding = data.padding();
        long authorization = data.authorization();
        byte flags = (byte)(data.flags() & 0xFF);
        int budget = (int)this.budgets.computeIfPresent(streamId, (i, b) -> b - (long)(length + padding)).longValue();
        long initialId = streamId & Long.MAX_VALUE;
        long timeStart = this.timestamps.get(initialId);
        long timeOffset = timeStart != -1L ? timestamp - timeStart : -1L;
        this.out.printf(String.format(this.streamFormat, timestamp, budget, traceId, streamId, timeOffset, String.format("DATA [%d] [%d] [%x] [0x%016x]", length, padding, flags, authorization)), new Object[0]);
    }

    private void handleEnd(EndFW end) {
        long timestamp = end.timestamp();
        long streamId = end.streamId();
        long traceId = end.trace();
        long authorization = end.authorization();
        int budget = (int)this.budgets.get(streamId);
        long initialId = streamId & Long.MAX_VALUE;
        long timeStart = this.timestamps.get(initialId);
        long timeOffset = timeStart != -1L ? timestamp - timeStart : -1L;
        this.out.printf(String.format(this.streamFormat, timestamp, budget, traceId, streamId, timeOffset, String.format("END [0x%016x]", authorization)), new Object[0]);
    }

    private void handleAbort(AbortFW abort) {
        long timestamp = abort.timestamp();
        long streamId = abort.streamId();
        long traceId = abort.trace();
        long authorization = abort.authorization();
        int budget = (int)this.budgets.get(streamId);
        long initialId = streamId & Long.MAX_VALUE;
        long timeStart = this.timestamps.get(initialId);
        long timeOffset = timeStart != -1L ? timestamp - timeStart : -1L;
        this.out.printf(String.format(this.streamFormat, timestamp, budget, traceId, streamId, timeOffset, String.format("ABORT [0x%016x]", authorization)), new Object[0]);
    }

    private void handleReset(ResetFW reset) {
        long timestamp = reset.timestamp();
        long streamId = reset.streamId();
        long traceId = reset.trace();
        int budget = (int)this.budgets.get(streamId);
        long initialId = streamId & Long.MAX_VALUE;
        long timeStart = this.timestamps.get(initialId);
        long timeOffset = timeStart != -1L ? timestamp - timeStart : -1L;
        this.out.printf(String.format(this.throttleFormat, timestamp, budget, traceId, streamId, timeOffset, "RESET"), new Object[0]);
    }

    private void handleWindow(WindowFW window) {
        long timestamp = window.timestamp();
        long streamId = window.streamId();
        long traceId = window.trace();
        int credit = window.credit();
        int padding = window.padding();
        long groupId = window.groupId();
        int budget = (int)this.budgets.computeIfPresent(streamId, (i, b) -> b + (long)credit).longValue();
        long initialId = streamId & Long.MAX_VALUE;
        long timeStart = this.timestamps.get(initialId);
        long timeOffset = timeStart != -1L ? timestamp - timeStart : -1L;
        this.out.printf(String.format(this.throttleFormat, timestamp, budget, traceId, streamId, timeOffset, String.format("WINDOW [%d] [%d] [%d]", credit, padding, groupId)), new Object[0]);
    }

    private InetSocketAddress toInetSocketAddress(TcpAddressFW tcpAddress, int tcpPort) {
        InetSocketAddress socketAddress = null;
        try {
            switch (tcpAddress.kind()) {
                case 1: {
                    byte[] address = new byte[4];
                    tcpAddress.ipv4Address().get((b, o, l) -> {
                        b.getBytes(o, address);
                        return address;
                    });
                    socketAddress = new InetSocketAddress(InetAddress.getByAddress(address), tcpPort);
                    break;
                }
                case 2: {
                    byte[] address = new byte[16];
                    tcpAddress.ipv4Address().get((b, o, l) -> {
                        b.getBytes(o, address);
                        return address;
                    });
                    socketAddress = new InetSocketAddress(InetAddress.getByAddress(address), tcpPort);
                    break;
                }
                case 3: {
                    String hostName = tcpAddress.host().asString();
                    socketAddress = new InetSocketAddress(hostName, tcpPort);
                }
            }
        }
        catch (UnknownHostException ex) {
            LangUtil.rethrowUnchecked(ex);
        }
        return socketAddress;
    }
}

