/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.publish.netty.pipeline;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Timer;
import com.netflix.spectator.impl.AtomicDouble;
import io.mantisrx.publish.internal.exceptions.RetryableException;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.publish.netty.proto.MantisEvent;
import io.mantisrx.publish.netty.proto.MantisEventEnvelope;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.time.Clock;
import java.util.ArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MantisEventAggregator
extends ChannelOutboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(MantisEventAggregator.class);
    private final Clock clock;
    private final Timer batchFlushTime;
    private final Counter droppedBatches;
    private final Counter flushSuccess;
    private final Counter flushFailure;
    private final AtomicDouble batchSize;
    private final ObjectWriter objectWriter;
    private final boolean compress;
    private ScheduledFuture<?> writerTimeout;
    private long flushIntervalMs;
    private int flushIntervalBytes;
    private MantisEventEnvelope currentMessage;
    private int currentMessageSize;

    MantisEventAggregator(Registry registry, Clock clock, boolean compress, long flushIntervalMs, int flushIntervalBytes) {
        this.clock = clock;
        this.batchFlushTime = SpectatorUtils.buildAndRegisterTimer((Registry)registry, (String)"batchFlushTime", (String[])new String[]{"channel", "netty"});
        this.droppedBatches = SpectatorUtils.buildAndRegisterCounter((Registry)registry, (String)"droppedBatches", (String[])new String[]{"channel", "netty"});
        this.flushSuccess = SpectatorUtils.buildAndRegisterCounter((Registry)registry, (String)"flushSuccess", (String[])new String[]{"channel", "netty"});
        this.flushFailure = SpectatorUtils.buildAndRegisterCounter((Registry)registry, (String)"flushFailure", (String[])new String[]{"channel", "netty"});
        this.batchSize = SpectatorUtils.buildAndRegisterGauge((Registry)registry, (String)"batchSize", (String[])new String[]{"channel", "netty"});
        this.flushIntervalMs = flushIntervalMs;
        this.flushIntervalBytes = flushIntervalBytes;
        this.compress = compress;
        this.objectWriter = new ObjectMapper().writer();
        this.currentMessage = new MantisEventEnvelope(clock.millis(), "origin", new ArrayList<MantisEvent>());
    }

    private boolean acceptOutboundMessage(Object msg) {
        return msg instanceof MantisEvent;
    }

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        if (this.acceptOutboundMessage(msg)) {
            MantisEvent event = (MantisEvent)msg;
            int eventSize = event.size();
            if (this.currentMessageSize + eventSize > this.flushIntervalBytes) {
                this.writeBatch(ctx, promise);
            } else {
                promise.setSuccess();
            }
            this.currentMessage.addEvent(event);
            this.currentMessageSize += event.size();
        } else {
            ctx.write(msg, promise);
        }
    }

    public void handlerAdded(ChannelHandlerContext ctx) {
        this.writerTimeout = ctx.executor().scheduleAtFixedRate((Runnable)new WriterTimeoutTask(ctx), this.flushIntervalMs, this.flushIntervalMs, TimeUnit.MILLISECONDS);
    }

    public void handlerRemoved(ChannelHandlerContext ctx) {
        if (this.writerTimeout != null) {
            this.writerTimeout.cancel(false);
            this.writerTimeout = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void writeBatch(ChannelHandlerContext ctx, ChannelPromise promise) {
        try {
            FullHttpRequest request = this.buildRequest(ctx, this.currentMessage);
            int eventListSize = this.currentMessage.getEventList().size();
            this.batchSize.set((double)eventListSize);
            long start = this.clock.millis();
            ctx.writeAndFlush((Object)request).addListener(future -> {
                long end = this.clock.millis();
                this.batchFlushTime.record(end - start, TimeUnit.MILLISECONDS);
                if (future.isSuccess()) {
                    promise.setSuccess();
                    this.flushSuccess.increment();
                } else {
                    promise.setFailure((Throwable)new RetryableException(future.cause().getMessage()));
                    this.flushFailure.increment();
                }
            });
        }
        catch (IOException e) {
            LOG.debug("unable to serialize batch", (Throwable)e);
            this.droppedBatches.increment();
            ctx.fireExceptionCaught((Throwable)e);
        }
        finally {
            this.currentMessage = new MantisEventEnvelope(this.clock.millis(), "origin", new ArrayList<MantisEvent>());
            this.currentMessageSize = 0;
        }
    }

    FullHttpRequest buildRequest(ChannelHandlerContext ctx, MantisEventEnvelope event) throws IOException {
        InetSocketAddress address = (InetSocketAddress)ctx.channel().remoteAddress();
        String urlString = "http://" + address.getHostString() + ':' + address.getPort() + "/api/v1/events";
        URI uri = URI.create(urlString);
        DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath(), ctx.alloc().directBuffer());
        request.headers().add((CharSequence)HttpHeaderNames.ACCEPT, (Object)HttpHeaderValues.APPLICATION_JSON);
        request.headers().add((CharSequence)HttpHeaderNames.ORIGIN, (Object)"localhost");
        if (this.compress) {
            request.headers().add((CharSequence)HttpHeaderNames.CONTENT_ENCODING, (Object)HttpHeaderValues.GZIP);
        }
        request.headers().add((CharSequence)HttpHeaderNames.TRANSFER_ENCODING, (Object)HttpHeaderValues.CHUNKED);
        event.setTs(this.clock.millis());
        try (ByteBufOutputStream bbos = new ByteBufOutputStream(request.content());){
            this.objectWriter.writeValue((OutputStream)bbos, (Object)event);
        }
        return request;
    }

    private class WriterTimeoutTask
    implements Runnable {
        private final ChannelHandlerContext ctx;

        WriterTimeoutTask(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        @Override
        public void run() {
            if (!this.ctx.channel().isActive()) {
                LOG.debug("channel not active");
                return;
            }
            if (this.ctx.channel().isWritable() && MantisEventAggregator.this.currentMessage != null && MantisEventAggregator.this.currentMessageSize != 0) {
                MantisEventAggregator.this.writeBatch(this.ctx, this.ctx.channel().newPromise());
            }
        }
    }
}

