package io.pravega.client.netty.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.netty.impl.ClientConnection;
import io.pravega.common.concurrent.FutureHelpers;
import io.pravega.shaded.com.google.common.base.Preconditions;
import io.pravega.shaded.io.netty.channel.Channel;
import io.pravega.shaded.io.netty.channel.ChannelHandlerContext;
import io.pravega.shaded.io.netty.channel.ChannelInboundHandlerAdapter;
import io.pravega.shaded.io.netty.channel.ChannelPromise;
import io.pravega.shaded.io.netty.util.concurrent.Future;
import io.pravega.shaded.io.netty.util.concurrent.GenericFutureListener;
import io.pravega.shaded.io.netty.util.concurrent.PromiseCombiner;
import io.pravega.shaded.io.netty.util.concurrent.ScheduledFuture;
import io.pravega.shared.protocol.netty.Append;
import io.pravega.shared.protocol.netty.AppendBatchSizeTracker;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
import io.pravega.shared.protocol.netty.Reply;
import io.pravega.shared.protocol.netty.ReplyProcessor;
import io.pravega.shared.protocol.netty.WireCommand;
import io.pravega.shared.protocol.netty.WireCommands;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/netty/impl/ClientConnectionInboundHandler.class */
public class ClientConnectionInboundHandler extends ChannelInboundHandlerAdapter implements ClientConnection {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(ClientConnectionInboundHandler.class);
    private final String connectionName;
    private final ReplyProcessor processor;
    private final AtomicReference<Channel> channel = new AtomicReference<>();
    private final AtomicReference<ScheduledFuture<?>> keepAliveFuture = new AtomicReference<>();
    private final AtomicBoolean recentMessage = new AtomicBoolean(false);
    private final AppendBatchSizeTracker batchSizeTracker;

    /* loaded from: input_file:io/pravega/client/netty/impl/ClientConnectionInboundHandler$KeepAliveTask.class */
    private final class KeepAliveTask implements Runnable {
        private KeepAliveTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (!ClientConnectionInboundHandler.this.recentMessage.getAndSet(false)) {
                    ClientConnectionInboundHandler.this.send(new WireCommands.KeepAlive());
                }
            } catch (Exception e) {
                ClientConnectionInboundHandler.log.warn("Keep alive failed, killing connection {} due to {} ", ClientConnectionInboundHandler.this.connectionName, e.getMessage());
                ClientConnectionInboundHandler.this.close();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClientConnectionInboundHandler(String str, ReplyProcessor replyProcessor, AppendBatchSizeTracker appendBatchSizeTracker) {
        Preconditions.checkNotNull(replyProcessor);
        Preconditions.checkNotNull(appendBatchSizeTracker);
        this.connectionName = str;
        this.processor = replyProcessor;
        this.batchSizeTracker = appendBatchSizeTracker;
    }

    @Override // io.pravega.shaded.io.netty.channel.ChannelInboundHandlerAdapter, io.pravega.shaded.io.netty.channel.ChannelInboundHandler
    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelRegistered(channelHandlerContext);
        Channel channel = channelHandlerContext.channel();
        this.channel.set(channel);
        log.info("Connection established {} ", channelHandlerContext);
        channel.write(new WireCommands.Hello(1, 1), channel.voidPromise());
        ScheduledFuture<?> andSet = this.keepAliveFuture.getAndSet(channel.eventLoop().scheduleWithFixedDelay((Runnable) new KeepAliveTask(), 20L, 10L, TimeUnit.SECONDS));
        if (andSet != null) {
            andSet.cancel(false);
        }
    }

    @Override // io.pravega.shaded.io.netty.channel.ChannelInboundHandlerAdapter, io.pravega.shaded.io.netty.channel.ChannelInboundHandler
    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        ScheduledFuture<?> scheduledFuture = this.keepAliveFuture.get();
        if (scheduledFuture != null) {
            scheduledFuture.cancel(false);
        }
        this.channel.set(null);
        this.processor.connectionDropped();
        super.channelUnregistered(channelHandlerContext);
    }

    @Override // io.pravega.shaded.io.netty.channel.ChannelInboundHandlerAdapter, io.pravega.shaded.io.netty.channel.ChannelInboundHandler
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
        Reply reply = (Reply) obj;
        log.debug(this.connectionName + " processing reply: {}", reply);
        if (reply instanceof WireCommands.DataAppended) {
            this.batchSizeTracker.recordAck(((WireCommands.DataAppended) reply).getEventNumber());
        }
        try {
            reply.process(this.processor);
        } catch (Exception e) {
            this.processor.processingFailure(e);
        }
    }

    @Override // io.pravega.shaded.io.netty.channel.ChannelInboundHandlerAdapter, io.pravega.shaded.io.netty.channel.ChannelHandlerAdapter, io.pravega.shaded.io.netty.channel.ChannelHandler, io.pravega.shaded.io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        this.processor.processingFailure(new ConnectionFailedException(th));
    }

    @Override // io.pravega.client.netty.impl.ClientConnection
    public void send(WireCommand wireCommand) throws ConnectionFailedException {
        this.recentMessage.set(true);
        FutureHelpers.getAndHandleExceptions(getChannel().writeAndFlush(wireCommand), ConnectionFailedException::new);
    }

    @Override // io.pravega.client.netty.impl.ClientConnection
    public void send(Append append) throws ConnectionFailedException {
        this.recentMessage.set(true);
        this.batchSizeTracker.recordAppend(append.getEventNumber(), append.getData().readableBytes());
        FutureHelpers.getAndHandleExceptions(getChannel().writeAndFlush(append), ConnectionFailedException::new);
    }

    @Override // io.pravega.client.netty.impl.ClientConnection
    public void sendAsync(WireCommand wireCommand) throws ConnectionFailedException {
        this.recentMessage.set(true);
        Channel channel = getChannel();
        try {
            channel.writeAndFlush(wireCommand, channel.voidPromise());
        } catch (RuntimeException e) {
            throw new ConnectionFailedException(e);
        }
    }

    @Override // io.pravega.client.netty.impl.ClientConnection
    public void sendAsync(List<Append> list, final ClientConnection.CompletedCallback completedCallback) {
        this.recentMessage.set(true);
        Channel channel = this.channel.get();
        if (channel == null) {
            completedCallback.complete(new ConnectionFailedException("Connection to " + this.connectionName + " is not established."));
            return;
        }
        PromiseCombiner promiseCombiner = new PromiseCombiner();
        for (Append append : list) {
            this.batchSizeTracker.recordAppend(append.getEventNumber(), append.getData().readableBytes());
            promiseCombiner.add(channel.write(append));
        }
        channel.flush();
        ChannelPromise newPromise = channel.newPromise();
        newPromise.addListener2((GenericFutureListener<? extends Future<? super Void>>) new GenericFutureListener<Future<? super Void>>() { // from class: io.pravega.client.netty.impl.ClientConnectionInboundHandler.1
            @Override // io.pravega.shaded.io.netty.util.concurrent.GenericFutureListener
            public void operationComplete(Future<? super Void> future) throws Exception {
                Throwable cause = future.cause();
                completedCallback.complete(cause == null ? null : new ConnectionFailedException(cause));
            }
        });
        promiseCombiner.finish(newPromise);
    }

    @Override // io.pravega.client.netty.impl.ClientConnection, java.lang.AutoCloseable
    public void close() {
        Channel channel = this.channel.get();
        if (channel != null) {
            channel.close();
        }
    }

    private Channel getChannel() throws ConnectionFailedException {
        Channel channel = this.channel.get();
        if (channel == null) {
            throw new ConnectionFailedException("Connection to " + this.connectionName + " is not established.");
        }
        return channel;
    }
}
