/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.publish.netty.pipeline;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Timer;
import com.netflix.spectator.impl.AtomicDouble;
import io.mantisrx.publish.internal.exceptions.RetryableException;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.publish.netty.proto.MantisEvent;
import io.mantisrx.publish.netty.proto.MantisEventEnvelope;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectWriter;
import io.mantisrx.shaded.io.netty.buffer.ByteBufOutputStream;
import io.mantisrx.shaded.io.netty.channel.ChannelHandlerContext;
import io.mantisrx.shaded.io.netty.channel.ChannelOutboundHandlerAdapter;
import io.mantisrx.shaded.io.netty.channel.ChannelPromise;
import io.mantisrx.shaded.io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.mantisrx.shaded.io.netty.handler.codec.http.FullHttpRequest;
import io.mantisrx.shaded.io.netty.handler.codec.http.HttpHeaderNames;
import io.mantisrx.shaded.io.netty.handler.codec.http.HttpHeaderValues;
import io.mantisrx.shaded.io.netty.handler.codec.http.HttpMethod;
import io.mantisrx.shaded.io.netty.handler.codec.http.HttpVersion;
import io.mantisrx.shaded.io.netty.util.concurrent.Future;
import io.mantisrx.shaded.io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.time.Clock;
import java.util.ArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MantisEventAggregator
extends ChannelOutboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(MantisEventAggregator.class);
    private final Clock clock;
    private final Timer batchFlushTime;
    private final Counter droppedBatches;
    private final Counter flushSuccess;
    private final Counter flushFailure;
    private final AtomicDouble batchSize;
    private final ObjectWriter objectWriter;
    private final boolean compress;
    private ScheduledFuture<?> writerTimeout;
    private long flushIntervalMs;
    private int flushIntervalBytes;
    private MantisEventEnvelope currentMessage;
    private int currentMessageSize;

    MantisEventAggregator(Registry registry, Clock clock, boolean compress, long flushIntervalMs, int flushIntervalBytes) {
        this.clock = clock;
        this.batchFlushTime = SpectatorUtils.buildAndRegisterTimer(registry, "batchFlushTime", "channel", "netty");
        this.droppedBatches = SpectatorUtils.buildAndRegisterCounter(registry, "droppedBatches", "channel", "netty");
        this.flushSuccess = SpectatorUtils.buildAndRegisterCounter(registry, "flushSuccess", "channel", "netty");
        this.flushFailure = SpectatorUtils.buildAndRegisterCounter(registry, "flushFailure", "channel", "netty");
        this.batchSize = SpectatorUtils.buildAndRegisterGauge(registry, "batchSize", "channel", "netty");
        this.flushIntervalMs = flushIntervalMs;
        this.flushIntervalBytes = flushIntervalBytes;
        this.compress = compress;
        this.objectWriter = new ObjectMapper().writer();
        this.currentMessage = new MantisEventEnvelope(clock.millis(), "origin", new ArrayList<MantisEvent>());
    }

    private boolean acceptOutboundMessage(Object msg) {
        return msg instanceof MantisEvent;
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise2) {
        if (this.acceptOutboundMessage(msg)) {
            MantisEvent event = (MantisEvent)msg;
            int eventSize = event.size();
            if (this.currentMessageSize + eventSize > this.flushIntervalBytes) {
                this.writeBatch(ctx, promise2);
            } else {
                promise2.setSuccess();
            }
            this.currentMessage.addEvent(event);
            this.currentMessageSize += event.size();
        } else {
            ctx.write(msg, promise2);
        }
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        this.writerTimeout = ctx.executor().scheduleAtFixedRate(new WriterTimeoutTask(ctx), this.flushIntervalMs, this.flushIntervalMs, TimeUnit.MILLISECONDS);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        if (this.writerTimeout != null) {
            this.writerTimeout.cancel(false);
            this.writerTimeout = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void writeBatch(ChannelHandlerContext ctx, ChannelPromise promise2) {
        try {
            FullHttpRequest request = this.buildRequest(ctx, this.currentMessage);
            int eventListSize = this.currentMessage.getEventList().size();
            this.batchSize.set((double)eventListSize);
            long start = this.clock.millis();
            ctx.writeAndFlush(request).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)future2 -> {
                long end = this.clock.millis();
                this.batchFlushTime.record(end - start, TimeUnit.MILLISECONDS);
                if (future2.isSuccess()) {
                    promise2.setSuccess();
                    this.flushSuccess.increment();
                } else {
                    promise2.setFailure(new RetryableException(future2.cause().getMessage()));
                    this.flushFailure.increment();
                }
            }));
        }
        catch (IOException e2) {
            LOG.debug("unable to serialize batch", e2);
            this.droppedBatches.increment();
            ctx.fireExceptionCaught(e2);
        }
        finally {
            this.currentMessage = new MantisEventEnvelope(this.clock.millis(), "origin", new ArrayList<MantisEvent>());
            this.currentMessageSize = 0;
        }
    }

    FullHttpRequest buildRequest(ChannelHandlerContext ctx, MantisEventEnvelope event) throws IOException {
        InetSocketAddress address = (InetSocketAddress)ctx.channel().remoteAddress();
        String urlString = "http://" + address.getHostString() + ':' + address.getPort() + "/api/v1/events";
        URI uri = URI.create(urlString);
        DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath(), ctx.alloc().directBuffer());
        request.headers().add((CharSequence)HttpHeaderNames.ACCEPT, (Object)HttpHeaderValues.APPLICATION_JSON);
        request.headers().add((CharSequence)HttpHeaderNames.ORIGIN, (Object)"localhost");
        if (this.compress) {
            request.headers().add((CharSequence)HttpHeaderNames.CONTENT_ENCODING, (Object)HttpHeaderValues.GZIP);
        }
        request.headers().add((CharSequence)HttpHeaderNames.TRANSFER_ENCODING, (Object)HttpHeaderValues.CHUNKED);
        event.setTs(this.clock.millis());
        try (ByteBufOutputStream bbos = new ByteBufOutputStream(request.content());){
            this.objectWriter.writeValue(bbos, (Object)event);
        }
        return request;
    }

    private class WriterTimeoutTask
    implements Runnable {
        private final ChannelHandlerContext ctx;

        WriterTimeoutTask(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        @Override
        public void run() {
            if (!this.ctx.channel().isActive()) {
                LOG.debug("channel not active");
                return;
            }
            if (this.ctx.channel().isWritable() && MantisEventAggregator.this.currentMessage != null && MantisEventAggregator.this.currentMessageSize != 0) {
                MantisEventAggregator.this.writeBatch(this.ctx, this.ctx.channel().newPromise());
            }
        }
    }
}

