package io.airlift.drift.transport.netty;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AbstractFuture;
import io.airlift.drift.TException;
import io.airlift.drift.protocol.TTransportException;
import io.airlift.drift.transport.MethodMetadata;
import io.airlift.units.Duration;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.ThreadSafe;

/* JADX INFO: Access modifiers changed from: package-private */
@ThreadSafe
/* loaded from: input_file:io/airlift/drift/transport/netty/ThriftClientHandler.class */
public class ThriftClientHandler extends ChannelDuplexHandler {
    private static final int ONEWAY_SEQUENCE_ID = -1;
    private final Duration requestTimeout;
    private final MessageEncoding messageEncoding;
    private final ConcurrentHashMap<Integer, RequestHandler> pendingRequests = new ConcurrentHashMap<>();
    private final AtomicReference<TException> channelError = new AtomicReference<>();
    private final AtomicInteger sequenceId = new AtomicInteger(42);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/airlift/drift/transport/netty/ThriftClientHandler$RequestHandler.class */
    public final class RequestHandler {
        private final ThriftRequest thriftRequest;
        private final int sequenceId;
        private final AtomicBoolean finished = new AtomicBoolean();
        private final AtomicReference<ScheduledFuture<?>> timeout = new AtomicReference<>();

        public RequestHandler(ThriftRequest thriftRequest, int i) {
            this.thriftRequest = thriftRequest;
            this.sequenceId = i;
        }

        void registerRequestTimeout(EventExecutor eventExecutor) {
            try {
                this.timeout.set(eventExecutor.schedule(() -> {
                    onChannelError(new TTransportException("Timed out waiting " + ThriftClientHandler.this.requestTimeout + " to receive response"));
                }, ThriftClientHandler.this.requestTimeout.toMillis(), TimeUnit.MILLISECONDS));
            } catch (Throwable th) {
                onChannelError(new TTransportException("Unable to schedule request timeout", th));
                throw th;
            }
        }

        ByteBuf encodeRequest(ByteBufAllocator byteBufAllocator) throws Exception {
            try {
                return ThriftClientHandler.this.messageEncoding.writeRequest(byteBufAllocator, this.sequenceId, this.thriftRequest.getMethod(), this.thriftRequest.getParameters(), ImmutableMap.of());
            } catch (Throwable th) {
                onChannelError(th);
                throw th;
            }
        }

        void onRequestSent() {
            if (this.thriftRequest.isOneway() && this.finished.compareAndSet(false, true)) {
                try {
                    cancelRequestTimeout();
                    this.thriftRequest.setResponse(null);
                } catch (Throwable th) {
                    onChannelError(th);
                }
            }
        }

        void onResponseReceived(ByteBuf byteBuf) {
            if (this.finished.compareAndSet(false, true)) {
                try {
                    cancelRequestTimeout();
                    this.thriftRequest.setResponse(ThriftClientHandler.this.messageEncoding.readResponse(byteBuf, this.sequenceId, this.thriftRequest.getMethod()));
                } catch (Throwable th) {
                    this.thriftRequest.failed(th);
                }
            }
        }

        void onChannelError(Throwable th) {
            if (this.finished.compareAndSet(false, true)) {
                try {
                    cancelRequestTimeout();
                } finally {
                    this.thriftRequest.failed(th);
                }
            }
        }

        private void cancelRequestTimeout() {
            ScheduledFuture<?> scheduledFuture = this.timeout.get();
            if (scheduledFuture != null) {
                scheduledFuture.cancel(false);
            }
        }
    }

    /* loaded from: input_file:io/airlift/drift/transport/netty/ThriftClientHandler$ThriftRequest.class */
    public static class ThriftRequest extends AbstractFuture<Object> {
        private final MethodMetadata method;
        private final List<Object> parameters;

        public ThriftRequest(MethodMetadata methodMetadata, List<Object> list) {
            this.method = methodMetadata;
            this.parameters = list;
        }

        MethodMetadata getMethod() {
            return this.method;
        }

        List<Object> getParameters() {
            return this.parameters;
        }

        boolean isOneway() {
            return this.method.isOneway();
        }

        void setResponse(Object obj) {
            set(obj);
        }

        void failed(Throwable th) {
            setException(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ThriftClientHandler(Duration duration, MessageEncoding messageEncoding) {
        this.requestTimeout = (Duration) Objects.requireNonNull(duration, "requestTimeout is null");
        this.messageEncoding = (MessageEncoding) Objects.requireNonNull(messageEncoding, "messageEncoding is null");
    }

    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
        if (obj instanceof ThriftRequest) {
            sendMessage(channelHandlerContext, (ThriftRequest) obj, channelPromise);
        } else {
            channelHandlerContext.write(obj, channelPromise);
        }
    }

    private void sendMessage(ChannelHandlerContext channelHandlerContext, ThriftRequest thriftRequest, ChannelPromise channelPromise) throws Exception {
        int incrementAndGet = thriftRequest.isOneway() ? ONEWAY_SEQUENCE_ID : this.sequenceId.incrementAndGet();
        RequestHandler requestHandler = new RequestHandler(thriftRequest, incrementAndGet);
        requestHandler.registerRequestTimeout(channelHandlerContext.executor());
        ByteBuf encodeRequest = requestHandler.encodeRequest(channelHandlerContext.alloc());
        if (!thriftRequest.isOneway() && this.pendingRequests.putIfAbsent(Integer.valueOf(incrementAndGet), requestHandler) != null) {
            requestHandler.onChannelError(new TTransportException("Another request with the same sequenceId is already in progress"));
        }
        try {
            ChannelFuture write = channelHandlerContext.write(encodeRequest, channelPromise);
            write.addListener(future -> {
                messageSent(channelHandlerContext, write, requestHandler);
            });
        } catch (Throwable th) {
            onError(channelHandlerContext, th);
        }
    }

    private void messageSent(ChannelHandlerContext channelHandlerContext, ChannelFuture channelFuture, RequestHandler requestHandler) {
        try {
            if (channelFuture.isSuccess()) {
                requestHandler.onRequestSent();
            } else {
                onError(channelHandlerContext, new TTransportException("Sending request failed", channelFuture.cause()));
            }
        } catch (Throwable th) {
            onError(channelHandlerContext, th);
        }
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if ((obj instanceof ByteBuf) && ((ByteBuf) obj).isReadable()) {
            ByteBuf byteBuf = (ByteBuf) obj;
            if (byteBuf.isReadable()) {
                messageReceived(channelHandlerContext, byteBuf);
                return;
            }
        }
        channelHandlerContext.fireChannelRead(obj);
    }

    private void messageReceived(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        try {
            OptionalInt extractResponseSequenceId = this.messageEncoding.extractResponseSequenceId(byteBuf);
            if (!extractResponseSequenceId.isPresent()) {
                throw new TTransportException("Could not find sequenceId in Thrift message");
            }
            RequestHandler remove = this.pendingRequests.remove(Integer.valueOf(extractResponseSequenceId.getAsInt()));
            if (remove == null) {
                throw new TTransportException("Unknown sequence id in response: " + extractResponseSequenceId.getAsInt());
            }
            remove.onResponseReceived(byteBuf);
        } catch (Throwable th) {
            onError(channelHandlerContext, th);
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        onError(channelHandlerContext, th);
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (this.pendingRequests.isEmpty()) {
            return;
        }
        onError(channelHandlerContext, new TTransportException("Client was disconnected by server"));
    }

    private void onError(ChannelHandlerContext channelHandlerContext, Throwable th) {
        TException tTransportException = th instanceof TException ? (TException) th : new TTransportException(th);
        if (!this.channelError.compareAndSet(null, tTransportException)) {
            return;
        }
        while (!this.pendingRequests.isEmpty()) {
            Iterator<RequestHandler> it = this.pendingRequests.values().iterator();
            while (it.hasNext()) {
                RequestHandler next = it.next();
                it.remove();
                next.onChannelError(tTransportException);
            }
        }
        channelHandlerContext.close();
    }
}
