/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.command.log.internal;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.function.LongPredicate;
import java.util.function.Predicate;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.collections.Long2LongHashMap;
import org.reaktivity.command.log.internal.Logger;
import org.reaktivity.command.log.internal.labels.LabelManager;
import org.reaktivity.command.log.internal.layouts.StreamsLayout;
import org.reaktivity.command.log.internal.spy.RingBufferSpy;
import org.reaktivity.command.log.internal.types.OctetsFW;
import org.reaktivity.command.log.internal.types.TcpAddressFW;
import org.reaktivity.command.log.internal.types.control.Role;
import org.reaktivity.command.log.internal.types.stream.AbortFW;
import org.reaktivity.command.log.internal.types.stream.BeginFW;
import org.reaktivity.command.log.internal.types.stream.DataFW;
import org.reaktivity.command.log.internal.types.stream.EndFW;
import org.reaktivity.command.log.internal.types.stream.FrameFW;
import org.reaktivity.command.log.internal.types.stream.HttpBeginExFW;
import org.reaktivity.command.log.internal.types.stream.ResetFW;
import org.reaktivity.command.log.internal.types.stream.SignalFW;
import org.reaktivity.command.log.internal.types.stream.TcpBeginExFW;
import org.reaktivity.command.log.internal.types.stream.WindowFW;

public final class LoggableStream
implements AutoCloseable {
    private final FrameFW frameRO = new FrameFW();
    private final BeginFW beginRO = new BeginFW();
    private final DataFW dataRO = new DataFW();
    private final EndFW endRO = new EndFW();
    private final AbortFW abortRO = new AbortFW();
    private final SignalFW signalRO = new SignalFW();
    private final ResetFW resetRO = new ResetFW();
    private final WindowFW windowRO = new WindowFW();
    private final TcpBeginExFW tcpBeginExRO = new TcpBeginExFW();
    private final HttpBeginExFW httpBeginExRO = new HttpBeginExFW();
    private final int index;
    private final LabelManager labels;
    private final String streamFormat;
    private final String throttleFormat;
    private final StreamsLayout layout;
    private final RingBufferSpy streamsBuffer;
    private final Logger out;
    private final boolean verbose;
    private final Long2LongHashMap budgets;
    private final Long2LongHashMap timestamps;
    private final LongPredicate nextTimestamp;

    LoggableStream(int index, LabelManager labels, Long2LongHashMap budgets, StreamsLayout layout, Logger logger, boolean verbose, Long2LongHashMap timestamps, LongPredicate nextTimestamp) {
        this.index = index;
        this.labels = labels;
        this.streamFormat = "[%d] [0x%08x] [0x%016x] [%s -> %s]\t[0x%016x] [0x%016x] [%016x] %s\n";
        this.throttleFormat = "[%d] [0x%08x] [0x%016x] [%s <- %s]\t[0x%016x] [0x%016x] [%016x] %s\n";
        this.layout = layout;
        this.streamsBuffer = layout.streamsBuffer();
        this.out = logger;
        this.verbose = verbose;
        this.budgets = budgets;
        this.timestamps = timestamps;
        this.nextTimestamp = nextTimestamp;
    }

    int process() {
        return this.streamsBuffer.spy(this::handleFrame, 1);
    }

    @Override
    public void close() throws Exception {
        this.layout.close();
    }

    public String toString() {
        return String.format("data%d (spy)", this.index);
    }

    private boolean handleFrame(int msgTypeId, DirectBuffer buffer, int index, int length) {
        FrameFW frame = this.frameRO.wrap(buffer, index, index + length);
        long timestamp = frame.timestamp();
        if (!this.nextTimestamp.test(timestamp)) {
            return false;
        }
        switch (msgTypeId) {
            case 1: {
                BeginFW begin = this.beginRO.wrap(buffer, index, index + length);
                this.handleBegin(begin);
                break;
            }
            case 2: {
                DataFW data = this.dataRO.wrap(buffer, index, index + length);
                this.handleData(data);
                break;
            }
            case 3: {
                EndFW end = this.endRO.wrap(buffer, index, index + length);
                this.handleEnd(end);
                break;
            }
            case 4: {
                AbortFW abort = this.abortRO.wrap(buffer, index, index + length);
                this.handleAbort(abort);
                break;
            }
            case 5: {
                SignalFW signal = this.signalRO.wrap(buffer, index, index + length);
                this.handleSignal(signal);
                break;
            }
            case 0x40000001: {
                ResetFW reset = this.resetRO.wrap(buffer, index, index + length);
                this.handleReset(reset);
                break;
            }
            case 0x40000002: {
                WindowFW window = this.windowRO.wrap(buffer, index, index + length);
                this.handleWindow(window);
            }
        }
        return true;
    }

    private void handleBegin(BeginFW begin) {
        long timestamp = begin.timestamp();
        long routeId = begin.routeId();
        long streamId = begin.streamId();
        long traceId = begin.trace();
        long authorization = begin.authorization();
        int budget = (int)this.budgets.computeIfAbsent(streamId, id -> 0L);
        long initialId = streamId | 1L;
        long timeStart = this.timestamps.computeIfAbsent(initialId, id -> timestamp);
        long timeOffset = timeStart != -1L ? timestamp - timeStart : -1L;
        int localId = (int)(routeId >> 48) & 0xFFFF;
        int remoteId = (int)(routeId >> 32) & 0xFFFF;
        int sourceId = streamId == initialId ? localId : remoteId;
        int targetId = streamId == initialId ? remoteId : localId;
        String sourceName = this.labels.lookupLabel(sourceId);
        String targetName = this.labels.lookupLabel(targetId);
        this.out.printf(this.streamFormat, timestamp, budget, traceId, sourceName, targetName, routeId, streamId, timeOffset, String.format("BEGIN [0x%016x]", authorization));
        OctetsFW extension = begin.extension();
        if (this.verbose && extension.sizeof() != 0) {
            int roleId;
            if (sourceName.startsWith("tcp") || targetName.startsWith("tcp")) {
                TcpBeginExFW tcpBeginEx = this.tcpBeginExRO.wrap(extension.buffer(), extension.offset(), extension.limit());
                InetSocketAddress localAddress = this.toInetSocketAddress(tcpBeginEx.localAddress(), tcpBeginEx.localPort());
                InetSocketAddress remoteAddress = this.toInetSocketAddress(tcpBeginEx.remoteAddress(), tcpBeginEx.remotePort());
                this.out.printf("[%d] %s\t%s\n", timestamp, localAddress, remoteAddress);
            }
            if ((sourceName.startsWith("http") || targetName.startsWith("http")) && (roleId = (int)(routeId >> 28) & 0xF) != 15) {
                Role role = Role.valueOf(roleId);
                boolean isInitial = (streamId & 1L) != 0L;
                boolean isReply = (streamId & 1L) == 0L;
                Predicate<String> isHttp11 = label -> label.startsWith("http#");
                Predicate<String> isHttp2 = label -> label.startsWith("http2#");
                Predicate<String> isHttpCodec = isHttp11.or(isHttp2);
                if (!(role == Role.SERVER && isInitial && isHttpCodec.test(targetName) || role == Role.SERVER && isReply && isHttpCodec.test(sourceName) || role == Role.CLIENT && isInitial && isHttpCodec.test(sourceName) || role == Role.CLIENT && isReply && isHttpCodec.test(targetName))) {
                    HttpBeginExFW httpBeginEx = this.httpBeginExRO.wrap(extension.buffer(), extension.offset(), extension.limit());
                    httpBeginEx.headers().forEach(h -> this.out.printf("[%d] %s: %s\n", timestamp, h.name().asString(), h.value().asString()));
                }
            }
        }
    }

    private void handleData(DataFW data) {
        long timestamp = data.timestamp();
        long routeId = data.routeId();
        long streamId = data.streamId();
        long traceId = data.trace();
        int length = data.length();
        int padding = data.padding();
        long authorization = data.authorization();
        byte flags = (byte)(data.flags() & 0xFF);
        int budget = (int)this.budgets.computeIfPresent(streamId, (i, b) -> b - (long)(length + padding)).longValue();
        long initialId = streamId | 1L;
        long timeStart = this.timestamps.get(initialId);
        long timeOffset = timeStart != -1L ? timestamp - timeStart : -1L;
        int localId = (int)(routeId >> 48) & 0xFFFF;
        int remoteId = (int)(routeId >> 32) & 0xFFFF;
        int sourceId = streamId == initialId ? localId : remoteId;
        int targetId = streamId == initialId ? remoteId : localId;
        String sourceName = this.labels.lookupLabel(sourceId);
        String targetName = this.labels.lookupLabel(targetId);
        this.out.printf(this.streamFormat, timestamp, budget, traceId, sourceName, targetName, routeId, streamId, timeOffset, String.format("DATA [%d] [%d] [%x] [0x%016x]", length, padding, flags, authorization));
    }

    private void handleEnd(EndFW end) {
        long timestamp = end.timestamp();
        long routeId = end.routeId();
        long streamId = end.streamId();
        long traceId = end.trace();
        long authorization = end.authorization();
        int budget = (int)this.budgets.get(streamId);
        long initialId = streamId | 1L;
        long timeStart = this.timestamps.get(initialId);
        long timeOffset = timeStart != -1L ? timestamp - timeStart : -1L;
        int localId = (int)(routeId >> 48) & 0xFFFF;
        int remoteId = (int)(routeId >> 32) & 0xFFFF;
        int sourceId = streamId == initialId ? localId : remoteId;
        int targetId = streamId == initialId ? remoteId : localId;
        String sourceName = this.labels.lookupLabel(sourceId);
        String targetName = this.labels.lookupLabel(targetId);
        this.out.printf(this.streamFormat, timestamp, budget, traceId, sourceName, targetName, routeId, streamId, timeOffset, String.format("END [0x%016x]", authorization));
    }

    private void handleAbort(AbortFW abort) {
        long timestamp = abort.timestamp();
        long routeId = abort.routeId();
        long streamId = abort.streamId();
        long traceId = abort.trace();
        long authorization = abort.authorization();
        int budget = (int)this.budgets.get(streamId);
        long initialId = streamId | 1L;
        long timeStart = this.timestamps.get(initialId);
        long timeOffset = timeStart != -1L ? timestamp - timeStart : -1L;
        int localId = (int)(routeId >> 48) & 0xFFFF;
        int remoteId = (int)(routeId >> 32) & 0xFFFF;
        int sourceId = streamId == initialId ? localId : remoteId;
        int targetId = streamId == initialId ? remoteId : localId;
        String sourceName = this.labels.lookupLabel(sourceId);
        String targetName = this.labels.lookupLabel(targetId);
        this.out.printf(this.streamFormat, timestamp, budget, traceId, sourceName, targetName, routeId, streamId, timeOffset, String.format("ABORT [0x%016x]", authorization));
    }

    private void handleSignal(SignalFW signal) {
        long timestamp = signal.timestamp();
        long routeId = signal.routeId();
        long streamId = signal.streamId();
        long traceId = signal.trace();
        long authorization = signal.authorization();
        long signalId = signal.signalId();
        int budget = (int)this.budgets.get(streamId);
        long initialId = streamId | 1L;
        long timeStart = this.timestamps.get(initialId);
        long timeOffset = timeStart != -1L ? timestamp - timeStart : -1L;
        int localId = (int)(routeId >> 48) & 0xFFFF;
        int remoteId = (int)(routeId >> 32) & 0xFFFF;
        int sourceId = streamId == initialId ? localId : remoteId;
        int targetId = streamId == initialId ? remoteId : localId;
        String sourceName = this.labels.lookupLabel(sourceId);
        String targetName = this.labels.lookupLabel(targetId);
        this.out.printf(this.streamFormat, timestamp, budget, traceId, sourceName, targetName, routeId, streamId, timeOffset, String.format("SIGNAL [%d] [0x%016x]", signalId, authorization));
    }

    private void handleReset(ResetFW reset) {
        long timestamp = reset.timestamp();
        long routeId = reset.routeId();
        long streamId = reset.streamId();
        long traceId = reset.trace();
        int budget = (int)this.budgets.get(streamId);
        long initialId = streamId | 1L;
        long timeStart = this.timestamps.get(initialId);
        long timeOffset = timeStart != -1L ? timestamp - timeStart : -1L;
        int localId = (int)(routeId >> 48) & 0xFFFF;
        int remoteId = (int)(routeId >> 32) & 0xFFFF;
        int sourceId = streamId == initialId ? localId : remoteId;
        int targetId = streamId == initialId ? remoteId : localId;
        String sourceName = this.labels.lookupLabel(sourceId);
        String targetName = this.labels.lookupLabel(targetId);
        this.out.printf(this.throttleFormat, timestamp, budget, traceId, sourceName, targetName, routeId, streamId, timeOffset, "RESET");
    }

    private void handleWindow(WindowFW window) {
        long timestamp = window.timestamp();
        long routeId = window.routeId();
        long streamId = window.streamId();
        long traceId = window.trace();
        int credit = window.credit();
        int padding = window.padding();
        long groupId = window.groupId();
        int budget = (int)this.budgets.computeIfPresent(streamId, (i, b) -> b + (long)credit).longValue();
        long initialId = streamId | 1L;
        long timeStart = this.timestamps.get(initialId);
        long timeOffset = timeStart != -1L ? timestamp - timeStart : -1L;
        int localId = (int)(routeId >> 48) & 0xFFFF;
        int remoteId = (int)(routeId >> 32) & 0xFFFF;
        int sourceId = streamId == initialId ? localId : remoteId;
        int targetId = streamId == initialId ? remoteId : localId;
        String sourceName = this.labels.lookupLabel(sourceId);
        String targetName = this.labels.lookupLabel(targetId);
        this.out.printf(this.throttleFormat, timestamp, budget, traceId, sourceName, targetName, routeId, streamId, timeOffset, String.format("WINDOW [%d] [%d] [%d]", credit, padding, groupId));
    }

    private InetSocketAddress toInetSocketAddress(TcpAddressFW tcpAddress, int tcpPort) {
        InetSocketAddress socketAddress = null;
        try {
            switch (tcpAddress.kind()) {
                case 1: {
                    byte[] address = new byte[4];
                    tcpAddress.ipv4Address().get((b, o, l) -> {
                        b.getBytes(o, address);
                        return address;
                    });
                    socketAddress = new InetSocketAddress(InetAddress.getByAddress(address), tcpPort);
                    break;
                }
                case 2: {
                    byte[] address = new byte[16];
                    tcpAddress.ipv4Address().get((b, o, l) -> {
                        b.getBytes(o, address);
                        return address;
                    });
                    socketAddress = new InetSocketAddress(InetAddress.getByAddress(address), tcpPort);
                    break;
                }
                case 3: {
                    String hostName = tcpAddress.host().asString();
                    socketAddress = new InetSocketAddress(hostName, tcpPort);
                }
            }
        }
        catch (UnknownHostException ex) {
            LangUtil.rethrowUnchecked(ex);
        }
        return socketAddress;
    }
}

