package io.airlift.drift.transport.netty.server;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.drift.codec.ThriftCodec;
import io.airlift.drift.codec.ThriftCodecManager;
import io.airlift.drift.codec.metadata.ThriftType;
import io.airlift.drift.transport.MethodMetadata;
import io.airlift.drift.transport.ParameterMetadata;
import io.airlift.drift.transport.netty.buffer.TestingPooledByteBufAllocator;
import io.airlift.drift.transport.netty.scribe.apache.LogEntry;
import io.airlift.drift.transport.netty.scribe.apache.ResultCode;
import io.airlift.drift.transport.netty.scribe.apache.scribe;
import io.airlift.drift.transport.netty.scribe.drift.DriftLogEntry;
import io.airlift.drift.transport.netty.scribe.drift.DriftResultCode;
import io.airlift.drift.transport.server.ServerInvokeRequest;
import io.airlift.drift.transport.server.ServerMethodInvoker;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TMessage;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:io/airlift/drift/transport/netty/server/TestDriftNettyServerTransport.class */
public class TestDriftNettyServerTransport {
    private static final ThriftCodecManager CODEC_MANAGER = new ThriftCodecManager(new ThriftCodec[0]);
    private static final MethodMetadata LOG_METHOD_METADATA = new MethodMetadata("Log", ImmutableList.of(new ParameterMetadata(1, "messages", CODEC_MANAGER.getCodec(ThriftType.list(CODEC_MANAGER.getCodec(DriftLogEntry.class).getType())))), CODEC_MANAGER.getCodec(DriftResultCode.class), ImmutableMap.of(), ImmutableMap.of(), false, true);
    private static final List<LogEntry> MESSAGES = ImmutableList.of(new LogEntry("hello", "world"), new LogEntry("bye", "world"));
    private static final List<DriftLogEntry> DRIFT_MESSAGES = ImmutableList.copyOf((Collection) MESSAGES.stream().map(logEntry -> {
        return new DriftLogEntry(logEntry.category, logEntry.message);
    }).collect(Collectors.toList()));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/airlift/drift/transport/netty/server/TestDriftNettyServerTransport$TestingServerMethodInvoker.class */
    public static class TestingServerMethodInvoker implements ServerMethodInvoker {
        private final BlockingQueue<SettableFuture<Object>> futureResults = new ArrayBlockingQueue(100);
        private final List<LogEntry> messages = new CopyOnWriteArrayList();

        private TestingServerMethodInvoker() {
        }

        public BlockingQueue<SettableFuture<Object>> getFutureResults() {
            return this.futureResults;
        }

        private List<LogEntry> getMessages() {
            return this.messages;
        }

        public Optional<MethodMetadata> getMethodMetadata(String str) {
            return TestDriftNettyServerTransport.LOG_METHOD_METADATA.getName().equals(str) ? Optional.of(TestDriftNettyServerTransport.LOG_METHOD_METADATA) : Optional.empty();
        }

        public ListenableFuture<Object> invoke(ServerInvokeRequest serverInvokeRequest) {
            MethodMetadata method = serverInvokeRequest.getMethod();
            if (!TestDriftNettyServerTransport.LOG_METHOD_METADATA.getName().equals(method.getName())) {
                return Futures.immediateFailedFuture(new IllegalArgumentException("unknown method " + method));
            }
            Map parameters = serverInvokeRequest.getParameters();
            if (parameters.size() != 1 || !parameters.containsKey((short) 1) || !(Iterables.getOnlyElement(parameters.values()) instanceof List)) {
                return Futures.immediateFailedFuture(new IllegalArgumentException("invalid parameters"));
            }
            this.messages.addAll((List) Iterables.getOnlyElement(parameters.values()));
            SettableFuture<Object> create = SettableFuture.create();
            this.futureResults.add(create);
            return create;
        }

        public void recordResult(String str, long j, ListenableFuture<Object> listenableFuture) {
        }
    }

    @Test
    public void testOutOfOrderNot() {
        TestingServerMethodInvoker testingServerMethodInvoker = new TestingServerMethodInvoker();
        Assert.assertEquals(ImmutableList.copyOf(testingServerMethodInvoker.getMessages()), Lists.newArrayList(Iterables.concat(Collections.nCopies(testServerMethodInvoker(testingServerMethodInvoker, true, ImmutableList.of(hostAndPort -> {
            return testOutOfOrder(hostAndPort, MESSAGES, new TTransportFactory(), new TBinaryProtocol.Factory(), testingServerMethodInvoker.getFutureResults());
        }, hostAndPort2 -> {
            return testOutOfOrder(hostAndPort2, MESSAGES, new TTransportFactory(), new TCompactProtocol.Factory(), testingServerMethodInvoker.getFutureResults());
        }, hostAndPort3 -> {
            return testOutOfOrder(hostAndPort3, MESSAGES, new TFramedTransport.Factory(), new TBinaryProtocol.Factory(), testingServerMethodInvoker.getFutureResults());
        }, hostAndPort4 -> {
            return testOutOfOrder(hostAndPort4, MESSAGES, new TFramedTransport.Factory(), new TCompactProtocol.Factory(), testingServerMethodInvoker.getFutureResults());
        })), DRIFT_MESSAGES))));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int testOutOfOrder(HostAndPort hostAndPort, List<LogEntry> list, TTransportFactory tTransportFactory, TProtocolFactory tProtocolFactory, BlockingQueue<SettableFuture<Object>> blockingQueue) {
        try {
            TSocket tSocket = new TSocket(hostAndPort.getHost(), hostAndPort.getPort());
            tSocket.open();
            try {
                TProtocol protocol = tProtocolFactory.getProtocol(tTransportFactory.getTransport(tSocket));
                sendLogRequest(11, list, protocol);
                SettableFuture<Object> take = blockingQueue.take();
                Assert.assertFalse(take.isDone());
                sendLogRequest(22, list, protocol);
                SettableFuture<Object> take2 = blockingQueue.take();
                Assert.assertFalse(take2.isDone());
                take2.set(DriftResultCode.OK);
                Assert.assertEquals(readLogResponse(22, protocol), ResultCode.OK);
                Assert.assertFalse(take.isDone());
                take.set(DriftResultCode.OK);
                Assert.assertEquals(readLogResponse(11, protocol), ResultCode.OK);
                tSocket.close();
                return 2;
            } catch (Throwable th) {
                tSocket.close();
                throw th;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (TException e2) {
            throw new RuntimeException((Throwable) e2);
        }
    }

    @Test
    public void testOutOfOrderNotSupported() {
        TestingServerMethodInvoker testingServerMethodInvoker = new TestingServerMethodInvoker();
        Assert.assertEquals(ImmutableList.copyOf(testingServerMethodInvoker.getMessages()), Lists.newArrayList(Iterables.concat(Collections.nCopies(testServerMethodInvoker(testingServerMethodInvoker, false, ImmutableList.of(hostAndPort -> {
            return testOutOfOrderNotSupported(hostAndPort, MESSAGES, new TTransportFactory(), new TBinaryProtocol.Factory(), testingServerMethodInvoker.getFutureResults());
        }, hostAndPort2 -> {
            return testOutOfOrderNotSupported(hostAndPort2, MESSAGES, new TTransportFactory(), new TCompactProtocol.Factory(), testingServerMethodInvoker.getFutureResults());
        }, hostAndPort3 -> {
            return testOutOfOrderNotSupported(hostAndPort3, MESSAGES, new TFramedTransport.Factory(), new TBinaryProtocol.Factory(), testingServerMethodInvoker.getFutureResults());
        }, hostAndPort4 -> {
            return testOutOfOrderNotSupported(hostAndPort4, MESSAGES, new TFramedTransport.Factory(), new TCompactProtocol.Factory(), testingServerMethodInvoker.getFutureResults());
        })), DRIFT_MESSAGES))));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int testOutOfOrderNotSupported(HostAndPort hostAndPort, List<LogEntry> list, TTransportFactory tTransportFactory, TProtocolFactory tProtocolFactory, BlockingQueue<SettableFuture<Object>> blockingQueue) {
        try {
            TSocket tSocket = new TSocket(hostAndPort.getHost(), hostAndPort.getPort());
            tSocket.open();
            try {
                TProtocol protocol = tProtocolFactory.getProtocol(tTransportFactory.getTransport(tSocket));
                sendLogRequest(11, list, protocol);
                SettableFuture<Object> take = blockingQueue.take();
                Assert.assertFalse(take.isDone());
                sendLogRequest(22, list, protocol);
                Assert.assertNull(blockingQueue.poll(1L, TimeUnit.SECONDS), "Second request future");
                Assert.assertFalse(take.isDone());
                take.set(DriftResultCode.OK);
                Assert.assertEquals(readLogResponse(11, protocol), ResultCode.OK);
                SettableFuture<Object> take2 = blockingQueue.take();
                Assert.assertFalse(take2.isDone());
                take2.set(DriftResultCode.OK);
                Assert.assertEquals(readLogResponse(22, protocol), ResultCode.OK);
                tSocket.close();
                return 2;
            } catch (Throwable th) {
                tSocket.close();
                throw th;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (TException e2) {
            throw new RuntimeException((Throwable) e2);
        }
    }

    private static int testServerMethodInvoker(ServerMethodInvoker serverMethodInvoker, boolean z, List<ToIntFunction<HostAndPort>> list) {
        DriftNettyServerConfig assumeClientsSupportOutOfOrderResponses = new DriftNettyServerConfig().setAssumeClientsSupportOutOfOrderResponses(z);
        TestingPooledByteBufAllocator testingPooledByteBufAllocator = new TestingPooledByteBufAllocator();
        DriftNettyServerTransport createServerTransport = new DriftNettyServerTransportFactory(assumeClientsSupportOutOfOrderResponses, testingPooledByteBufAllocator).createServerTransport(serverMethodInvoker);
        try {
            createServerTransport.start();
            HostAndPort fromParts = HostAndPort.fromParts("localhost", createServerTransport.getPort());
            int i = 0;
            Iterator<ToIntFunction<HostAndPort>> it = list.iterator();
            while (it.hasNext()) {
                i += it.next().applyAsInt(fromParts);
            }
            return i;
        } finally {
            createServerTransport.shutdown();
            testingPooledByteBufAllocator.close();
        }
    }

    private static void sendLogRequest(int i, List<LogEntry> list, TProtocol tProtocol) throws TException {
        tProtocol.writeMessageBegin(new TMessage("Log", (byte) 1, i));
        new scribe.Log_args().setMessages(list).write(tProtocol);
        tProtocol.writeMessageEnd();
        tProtocol.getTransport().flush();
    }

    private static ResultCode readLogResponse(int i, TProtocol tProtocol) throws TException {
        TMessage readMessageBegin = tProtocol.readMessageBegin();
        if (readMessageBegin.type == 3) {
            throw TApplicationException.readFrom(tProtocol);
        }
        if (readMessageBegin.type != 2) {
            throw new TApplicationException(5, "request failed");
        }
        if (readMessageBegin.seqid != i) {
            throw new TApplicationException(4, String.format("expected sequenceId %s, but received %s", Integer.valueOf(i), Integer.valueOf(readMessageBegin.seqid)));
        }
        scribe.Log_result log_result = new scribe.Log_result();
        log_result.read(tProtocol);
        tProtocol.readMessageEnd();
        return log_result.success;
    }
}
