/*
 * Decompiled with CFR 0.152.
 */
package org.drasyl.handler.logging;

import io.netty.channel.AddressedEnvelope;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.ScheduledFuture;
import java.io.PrintStream;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiPredicate;

public class MessagesThroughputHandler
extends ChannelDuplexHandler {
    public static final Duration INTERVAL = Duration.ofSeconds(1L);
    private final BiPredicate<SocketAddress, Object> consumeOutbound;
    private final BiPredicate<SocketAddress, Object> consumeInbound;
    private final LongAdder outboundMessages;
    private final LongAdder inboundMessages;
    private final PrintStream printStream;
    private ScheduledFuture<?> scheduledFuture;

    MessagesThroughputHandler(BiPredicate<SocketAddress, Object> consumeOutbound, BiPredicate<SocketAddress, Object> consumeInbound, LongAdder outboundMessages, LongAdder inboundMessages, PrintStream printStream, ScheduledFuture<?> scheduledFuture) {
        this.consumeOutbound = Objects.requireNonNull(consumeOutbound);
        this.consumeInbound = Objects.requireNonNull(consumeInbound);
        this.outboundMessages = Objects.requireNonNull(outboundMessages);
        this.inboundMessages = Objects.requireNonNull(inboundMessages);
        this.printStream = Objects.requireNonNull(printStream);
        this.scheduledFuture = scheduledFuture;
    }

    public MessagesThroughputHandler(BiPredicate<SocketAddress, Object> consumeOutbound, BiPredicate<SocketAddress, Object> consumeInbound) {
        this(consumeOutbound, consumeInbound, new LongAdder(), new LongAdder(), System.out, null);
    }

    public MessagesThroughputHandler() {
        this((address, msg) -> false, (address, msg) -> false);
    }

    private void start(ChannelHandlerContext ctx) {
        long startTime = System.currentTimeMillis();
        AtomicLong intervalTime = new AtomicLong(startTime);
        this.scheduledFuture = ctx.executor().scheduleWithFixedDelay(() -> {
            long currentTime = System.currentTimeMillis();
            double relativeIntervalStartTime = (double)(intervalTime.get() - startTime) / 1000.0;
            double relativeIntervalEndTime = (double)(currentTime - startTime) / 1000.0;
            long intervalDuration = currentTime - intervalTime.get();
            double outboundMps = (double)this.outboundMessages.sumThenReset() / 1000.0 * (double)intervalDuration;
            double inboundMps = (double)this.inboundMessages.sumThenReset() / 1000.0 * (double)intervalDuration;
            this.inboundMessages.reset();
            this.printStream.printf("%,6.2f - %,6.2f s; Tx: %,8.1f m/s; Rx: %,8.1f m/s;%n", relativeIntervalStartTime, relativeIntervalEndTime, outboundMps, inboundMps);
            intervalTime.set(currentTime);
        }, 0L, INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
    }

    private void stop() {
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(false);
        }
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        if (msg instanceof AddressedEnvelope) {
            this.outboundMessages.increment();
            if (this.consumeOutbound.test(((AddressedEnvelope)msg).recipient(), ((AddressedEnvelope)msg).content())) {
                promise.setSuccess();
            } else {
                ctx.write(msg, promise);
            }
        } else {
            ctx.write(msg, promise);
        }
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof AddressedEnvelope) {
            this.inboundMessages.increment();
            if (!this.consumeInbound.test(((AddressedEnvelope)msg).sender(), ((AddressedEnvelope)msg).content())) {
                ctx.fireChannelRead(msg);
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    public void channelActive(ChannelHandlerContext ctx) {
        this.start(ctx);
        ctx.fireChannelActive();
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        this.stop();
        ctx.fireChannelInactive();
    }
}

