package org.drasyl.pipeline.handler;

import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.io.PrintStream;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiPredicate;
import org.drasyl.event.Event;
import org.drasyl.event.NodeDownEvent;
import org.drasyl.event.NodeUnrecoverableErrorEvent;
import org.drasyl.event.NodeUpEvent;
import org.drasyl.pipeline.HandlerContext;
import org.drasyl.pipeline.address.Address;
import org.drasyl.pipeline.skeleton.SimpleDuplexHandler;
import org.drasyl.util.ReferenceCountUtil;

/* loaded from: input_file:org/drasyl/pipeline/handler/MessagesThroughputHandler.class */
public class MessagesThroughputHandler extends SimpleDuplexHandler<Object, Object, Address> {
    public static final String MESSAGES_THROUGHPUT_HANDLER = "MESSAGES_THROUGHPUT_HANDLER";
    public static final Duration INTERVAL = Duration.ofSeconds(1);
    private final BiPredicate<Address, Object> consumeOutbound;
    private final BiPredicate<Address, Object> consumeInbound;
    private final LongAdder outboundMessages;
    private final LongAdder inboundMessages;
    private final Scheduler scheduler;
    private final PrintStream printStream;
    private Disposable disposable;

    MessagesThroughputHandler(BiPredicate<Address, Object> biPredicate, BiPredicate<Address, Object> biPredicate2, LongAdder longAdder, LongAdder longAdder2, Scheduler scheduler, PrintStream printStream, Disposable disposable) {
        this.consumeOutbound = (BiPredicate) Objects.requireNonNull(biPredicate);
        this.consumeInbound = (BiPredicate) Objects.requireNonNull(biPredicate2);
        this.outboundMessages = (LongAdder) Objects.requireNonNull(longAdder);
        this.inboundMessages = (LongAdder) Objects.requireNonNull(longAdder2);
        this.scheduler = (Scheduler) Objects.requireNonNull(scheduler);
        this.printStream = (PrintStream) Objects.requireNonNull(printStream);
        this.disposable = disposable;
    }

    public MessagesThroughputHandler(BiPredicate<Address, Object> biPredicate, BiPredicate<Address, Object> biPredicate2, Scheduler scheduler) {
        this(biPredicate, biPredicate2, new LongAdder(), new LongAdder(), scheduler, System.out, null);
    }

    public MessagesThroughputHandler(BiPredicate<Address, Object> biPredicate, BiPredicate<Address, Object> biPredicate2) {
        this(biPredicate, biPredicate2, Schedulers.single());
    }

    public MessagesThroughputHandler() {
        this((address, obj) -> {
            return false;
        }, (address2, obj2) -> {
            return false;
        }, Schedulers.single());
    }

    @Override // org.drasyl.pipeline.skeleton.SimpleDuplexHandler, org.drasyl.pipeline.skeleton.SimpleInboundEventAwareHandler, org.drasyl.pipeline.skeleton.HandlerAdapter, org.drasyl.pipeline.Handler
    public void eventTriggered(HandlerContext handlerContext, Event event, CompletableFuture<Void> completableFuture) {
        if ((event instanceof NodeUpEvent) && this.disposable == null) {
            start();
        } else if (((event instanceof NodeUnrecoverableErrorEvent) || (event instanceof NodeDownEvent)) && this.disposable != null) {
            stop();
        }
        handlerContext.fireEventTriggered(event, completableFuture);
    }

    private void start() {
        long currentTimeMillis = System.currentTimeMillis();
        AtomicLong atomicLong = new AtomicLong(currentTimeMillis);
        this.disposable = this.scheduler.schedulePeriodicallyDirect(() -> {
            long currentTimeMillis2 = System.currentTimeMillis();
            double d = (currentTimeMillis2 - currentTimeMillis) / 1000.0d;
            long j = currentTimeMillis2 - atomicLong.get();
            this.inboundMessages.reset();
            this.printStream.printf("%,6.2f - %,6.2f s; Tx: %,8.1f m/s; Rx: %,8.1f m/s;%n", Double.valueOf((atomicLong.get() - currentTimeMillis) / 1000.0d), Double.valueOf(d), Double.valueOf((this.outboundMessages.sumThenReset() / 1000.0d) * j), Double.valueOf((this.inboundMessages.sumThenReset() / 1000.0d) * j));
            atomicLong.set(currentTimeMillis2);
        }, 0L, INTERVAL.toMillis(), TimeUnit.MILLISECONDS);
    }

    private void stop() {
        if (this.disposable != null) {
            this.disposable.dispose();
        }
    }

    @Override // org.drasyl.pipeline.skeleton.SimpleDuplexEventAwareHandler
    protected void matchedWrite(HandlerContext handlerContext, Address address, Object obj, CompletableFuture<Void> completableFuture) {
        this.outboundMessages.increment();
        if (this.consumeOutbound.test(address, obj)) {
            completableFuture.complete(null);
        } else {
            handlerContext.write(address, obj, completableFuture);
        }
    }

    @Override // org.drasyl.pipeline.skeleton.SimpleInboundEventAwareHandler
    protected void matchedRead(HandlerContext handlerContext, Address address, Object obj, CompletableFuture<Void> completableFuture) {
        this.inboundMessages.increment();
        if (!this.consumeInbound.test(address, obj)) {
            handlerContext.fireRead(address, obj, completableFuture);
            return;
        }
        try {
            completableFuture.complete(null);
            ReferenceCountUtil.safeRelease(obj);
        } catch (Throwable th) {
            ReferenceCountUtil.safeRelease(obj);
            throw th;
        }
    }
}
