/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.ext.eventbus.bridge.tcp;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.streams.WriteStream;
import io.vertx.ext.bridge.BridgeOptions;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.eventbus.bridge.tcp.BridgeEvent;
import io.vertx.ext.eventbus.bridge.tcp.TcpEventBusBridge;
import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameHelper;
import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameParser;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(value=VertxUnitRunner.class)
public class TcpEventBusBridgeTest {
    private Vertx vertx;
    private volatile Handler<BridgeEvent> eventHandler = event -> event.complete((Object)true);

    @Before
    public void before(TestContext context) {
        this.vertx = Vertx.vertx();
        Async async = context.async();
        this.vertx.eventBus().consumer("hello", msg -> msg.reply((Object)new JsonObject().put("value", (Object)("Hello " + ((JsonObject)msg.body()).getString("value")))));
        this.vertx.eventBus().consumer("echo", msg -> msg.reply(msg.body()));
        this.vertx.setPeriodic(1000L, __ -> this.vertx.eventBus().send("ping", (Object)new JsonObject().put("value", (Object)"hi")));
        TcpEventBusBridge bridge = TcpEventBusBridge.create((Vertx)this.vertx, (BridgeOptions)new BridgeOptions().addInboundPermitted(new PermittedOptions().setAddress("hello")).addInboundPermitted(new PermittedOptions().setAddress("echo")).addInboundPermitted(new PermittedOptions().setAddress("test")).addOutboundPermitted(new PermittedOptions().setAddress("echo")).addOutboundPermitted(new PermittedOptions().setAddress("test")).addOutboundPermitted(new PermittedOptions().setAddress("ping")), (NetServerOptions)new NetServerOptions(), event -> this.eventHandler.handle(event));
        bridge.listen(7000, res -> {
            context.assertTrue(res.succeeded());
            async.complete();
        });
    }

    @After
    public void after(TestContext context) {
        this.vertx.close(context.asyncAssertSuccess());
    }

    @Test
    public void testSendVoidMessage(TestContext context) {
        NetClient client = this.vertx.createNetClient();
        Async async = context.async();
        this.vertx.eventBus().consumer("test", msg -> {
            client.close();
            async.complete();
        });
        client.connect(7000, "localhost", context.asyncAssertSuccess(socket -> FrameHelper.sendFrame((String)"send", (String)"test", (JsonObject)new JsonObject().put("value", (Object)"vert.x"), (WriteStream)socket)));
    }

    @Test
    public void testNoHandlers(TestContext context) {
        NetClient client = this.vertx.createNetClient();
        Async async = context.async();
        client.connect(7000, "localhost", context.asyncAssertSuccess(socket -> {
            FrameParser parser = new FrameParser(parse -> {
                context.assertTrue(parse.succeeded());
                JsonObject frame = (JsonObject)parse.result();
                context.assertEquals((Object)"err", (Object)frame.getString("type"));
                context.assertEquals((Object)"#backtrack", (Object)frame.getString("address"));
                client.close();
                async.complete();
            });
            socket.handler((Handler)parser);
            FrameHelper.sendFrame((String)"send", (String)"test", (String)"#backtrack", (JsonObject)new JsonObject().put("value", (Object)"vert.x"), (WriteStream)socket);
        }));
    }

    @Test
    public void testErrorReply(TestContext context) {
        NetClient client = this.vertx.createNetClient();
        Async async = context.async();
        this.vertx.eventBus().consumer("test", msg -> msg.fail(0, "oops!"));
        client.connect(7000, "localhost", context.asyncAssertSuccess(socket -> {
            FrameParser parser = new FrameParser(parse -> {
                context.assertTrue(parse.succeeded());
                JsonObject frame = (JsonObject)parse.result();
                context.assertEquals((Object)"err", (Object)frame.getString("type"));
                context.assertEquals((Object)"#backtrack", (Object)frame.getString("address"));
                client.close();
                async.complete();
            });
            socket.handler((Handler)parser);
            FrameHelper.sendFrame((String)"send", (String)"test", (String)"#backtrack", (JsonObject)new JsonObject().put("value", (Object)"vert.x"), (WriteStream)socket);
        }));
    }

    @Test
    public void testSendsFromOtherSideOfBridge(TestContext context) {
        NetClient client = this.vertx.createNetClient();
        Async async = context.async();
        client.connect(7000, "localhost", context.asyncAssertSuccess(socket -> {
            FrameParser parser = new FrameParser(parse -> {
                context.assertTrue(parse.succeeded());
                JsonObject frame = (JsonObject)parse.result();
                context.assertNotEquals((Object)"err", (Object)frame.getString("type"));
                context.assertEquals((Object)true, (Object)frame.getBoolean("send"));
                context.assertEquals((Object)"hi", (Object)frame.getJsonObject("body").getString("value"));
                client.close();
                async.complete();
            });
            socket.handler((Handler)parser);
            FrameHelper.sendFrame((String)"register", (String)"ping", null, (WriteStream)socket);
        }));
    }

    @Test
    public void testSendMessageWithReplyBacktrack(TestContext context) {
        NetClient client = this.vertx.createNetClient();
        Async async = context.async();
        client.connect(7000, "localhost", context.asyncAssertSuccess(socket -> {
            FrameParser parser = new FrameParser(parse -> {
                context.assertTrue(parse.succeeded());
                JsonObject frame = (JsonObject)parse.result();
                context.assertNotEquals((Object)"err", (Object)frame.getString("type"));
                context.assertEquals((Object)true, (Object)frame.getBoolean("send"));
                context.assertEquals((Object)"Hello vert.x", (Object)frame.getJsonObject("body").getString("value"));
                client.close();
                async.complete();
            });
            socket.handler((Handler)parser);
            FrameHelper.sendFrame((String)"send", (String)"hello", (String)"#backtrack", (JsonObject)new JsonObject().put("value", (Object)"vert.x"), (WriteStream)socket);
        }));
    }

    @Test
    public void testSendMessageWithReplyBacktrackTimeout(TestContext context) {
        NetClient client = this.vertx.createNetClient();
        Async async = context.async();
        this.vertx.eventBus().consumer("test", msg -> {});
        client.connect(7000, "localhost", context.asyncAssertSuccess(socket -> {
            FrameParser parser = new FrameParser(parse -> {
                context.assertTrue(parse.succeeded());
                JsonObject frame = (JsonObject)parse.result();
                context.assertEquals((Object)"err", (Object)frame.getString("type"));
                context.assertEquals((Object)"TIMEOUT", (Object)frame.getString("failureType"));
                context.assertEquals((Object)-1, (Object)frame.getInteger("failureCode"));
                context.assertEquals((Object)"#backtrack", (Object)frame.getString("address"));
                client.close();
                async.complete();
            });
            socket.handler((Handler)parser);
            JsonObject headers = new JsonObject().put("timeout", (Object)100L);
            FrameHelper.sendFrame((String)"send", (String)"test", (String)"#backtrack", (JsonObject)headers, null, (JsonObject)new JsonObject().put("value", (Object)"vert.x"), (WriteStream)socket);
        }));
    }

    @Test
    public void testSendMessageWithDuplicateReplyID(TestContext context) {
        NetClient client = this.vertx.createNetClient();
        Async async = context.async();
        client.connect(7000, "localhost", context.asyncAssertSuccess(socket -> {
            this.vertx.eventBus().consumer("third-party-receiver", msg -> context.fail());
            FrameParser parser = new FrameParser(parse -> {
                context.assertTrue(parse.succeeded());
                client.close();
                async.complete();
            });
            socket.handler((Handler)parser);
            FrameHelper.sendFrame((String)"send", (String)"hello", (String)"third-party-receiver", (JsonObject)new JsonObject().put("value", (Object)"vert.x"), (WriteStream)socket);
        }));
    }

    @Test
    public void testRegister(TestContext context) {
        NetClient client = this.vertx.createNetClient();
        Async async = context.async();
        client.connect(7000, "localhost", context.asyncAssertSuccess(socket -> {
            FrameParser parser = new FrameParser(parse -> {
                context.assertTrue(parse.succeeded());
                JsonObject frame = (JsonObject)parse.result();
                context.assertNotEquals((Object)"err", (Object)frame.getString("type"));
                context.assertEquals((Object)false, (Object)frame.getBoolean("send"));
                context.assertEquals((Object)"Vert.x", (Object)frame.getJsonObject("body").getString("value"));
                client.close();
                async.complete();
            });
            socket.handler((Handler)parser);
            FrameHelper.sendFrame((String)"register", (String)"echo", null, (WriteStream)socket);
            FrameHelper.sendFrame((String)"publish", (String)"echo", (JsonObject)new JsonObject().put("value", (Object)"Vert.x"), (WriteStream)socket);
        }));
    }

    @Test
    public void testUnRegister(TestContext context) {
        NetClient client = this.vertx.createNetClient();
        Async async = context.async();
        String address = "test";
        client.connect(7000, "localhost", context.asyncAssertSuccess(socket -> {
            AtomicBoolean unregistered = new AtomicBoolean(false);
            FrameParser parser = new FrameParser(parse -> {
                context.assertTrue(parse.succeeded());
                JsonObject frame = (JsonObject)parse.result();
                if (unregistered.get()) {
                    context.assertEquals((Object)"err", (Object)frame.getString("type"));
                    context.assertEquals((Object)"#backtrack", (Object)frame.getString("address"));
                    context.assertEquals((Object)"NO_HANDLERS", (Object)frame.getString("failureType"));
                    context.assertEquals((Object)"No handlers for address test", (Object)frame.getString("message"));
                    client.close();
                    async.complete();
                } else {
                    context.assertNotEquals((Object)"err", (Object)frame.getString("type"));
                    context.assertEquals((Object)false, (Object)frame.getBoolean("send"));
                    context.assertEquals((Object)"Vert.x", (Object)frame.getJsonObject("body").getString("value"));
                    unregistered.compareAndSet(false, true);
                    FrameHelper.sendFrame((String)"unregister", (String)"test", null, (WriteStream)socket);
                    FrameHelper.sendFrame((String)"send", (String)"test", (String)"#backtrack", (JsonObject)new JsonObject().put("value", (Object)"This will fail anyway!"), (WriteStream)socket);
                }
            });
            socket.handler((Handler)parser);
            FrameHelper.sendFrame((String)"register", (String)"test", null, (WriteStream)socket);
            FrameHelper.sendFrame((String)"publish", (String)"test", (JsonObject)new JsonObject().put("value", (Object)"Vert.x"), (WriteStream)socket);
        }));
    }

    @Test
    public void testReplyFromClient(TestContext context) {
        NetClient client = this.vertx.createNetClient();
        Async async = context.async();
        String address = "test";
        client.connect(7000, "localhost", context.asyncAssertSuccess(socket -> {
            FrameParser parser = new FrameParser(parse -> {
                context.assertTrue(parse.succeeded());
                JsonObject frame = (JsonObject)parse.result();
                if ("message".equals(frame.getString("type"))) {
                    context.assertEquals((Object)true, (Object)frame.getBoolean("send"));
                    context.assertEquals((Object)"Vert.x", (Object)frame.getJsonObject("body").getString("value"));
                    FrameHelper.sendFrame((String)"send", (String)frame.getString("replyAddress"), (JsonObject)new JsonObject().put("value", (Object)"You got it"), (WriteStream)socket);
                }
            });
            socket.handler((Handler)parser);
            FrameHelper.sendFrame((String)"register", (String)"test", null, (WriteStream)socket);
            this.vertx.setTimer(500L, timerId -> this.vertx.eventBus().request("test", (Object)new JsonObject().put("value", (Object)"Vert.x"), respMessage -> {
                context.assertTrue(respMessage.succeeded());
                context.assertEquals((Object)"You got it", (Object)((JsonObject)((Message)respMessage.result()).body()).getString("value"));
                client.close();
                async.complete();
            }));
        }));
    }

    @Test
    public void testFailFromClient(TestContext context) {
        NetClient client = this.vertx.createNetClient();
        Async async = context.async();
        String address = "test";
        client.connect(7000, "localhost", context.asyncAssertSuccess(socket -> {
            FrameParser parser = new FrameParser(parse -> {
                context.assertTrue(parse.succeeded());
                JsonObject frame = (JsonObject)parse.result();
                if ("message".equals(frame.getString("type"))) {
                    context.assertEquals((Object)true, (Object)frame.getBoolean("send"));
                    context.assertEquals((Object)"Vert.x", (Object)frame.getJsonObject("body").getString("value"));
                    FrameHelper.writeFrame((JsonObject)new JsonObject().put("type", (Object)"send").put("address", (Object)frame.getString("replyAddress")).put("failureCode", (Object)1234).put("message", (Object)"ooops!"), (WriteStream)socket);
                }
            });
            socket.handler((Handler)parser);
            FrameHelper.sendFrame((String)"register", (String)"test", null, (WriteStream)socket);
            this.vertx.setTimer(500L, timerId -> this.vertx.eventBus().request("test", (Object)new JsonObject().put("value", (Object)"Vert.x"), respMessage -> {
                context.assertTrue(respMessage.failed());
                context.assertEquals((Object)"ooops!", (Object)respMessage.cause().getMessage());
                client.close();
                async.complete();
            }));
        }));
    }

    @Test
    public void testSendPing(TestContext context) {
        NetClient client = this.vertx.createNetClient();
        Async async = context.async();
        FrameParser parser = new FrameParser(parse -> {
            context.assertTrue(parse.succeeded());
            JsonObject frame = (JsonObject)parse.result();
            context.assertEquals((Object)"pong", (Object)frame.getString("type"));
            client.close();
            async.complete();
        });
        client.connect(7000, "localhost", context.asyncAssertSuccess(socket -> {
            socket.handler((Handler)parser);
            FrameHelper.sendFrame((String)"register", (String)"echo", null, (WriteStream)socket);
            FrameHelper.sendFrame((String)"ping", (WriteStream)socket);
        }));
    }

    @Test
    public void testNoAddress(TestContext context) {
        NetClient client = this.vertx.createNetClient();
        Async async = context.async();
        AtomicBoolean errorOnce = new AtomicBoolean(false);
        FrameParser parser = new FrameParser(parse -> {
            context.assertTrue(parse.succeeded());
            JsonObject frame = (JsonObject)parse.result();
            if (!errorOnce.compareAndSet(false, true)) {
                context.fail("Client gets error message twice!");
            } else {
                context.assertEquals((Object)"err", (Object)frame.getString("type"));
                context.assertEquals((Object)"missing_address", (Object)frame.getString("message"));
                this.vertx.setTimer(200L, l -> {
                    client.close();
                    async.complete();
                });
            }
        });
        client.connect(7000, "localhost", context.asyncAssertSuccess(socket -> {
            socket.handler((Handler)parser);
            FrameHelper.sendFrame((String)"send", (WriteStream)socket);
        }));
    }
}

