package com.datastax.driver.core;

import com.google.common.util.concurrent.Uninterruptibles;
import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.log4j.Level;
import org.scassandra.http.client.PrimingRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:com/datastax/driver/core/HeartbeatTest.class */
public class HeartbeatTest extends ScassandraTestBase {
    static Logger logger = LoggerFactory.getLogger(HeartbeatTest.class);
    org.apache.log4j.Logger connectionLogger = org.apache.log4j.Logger.getLogger(Connection.class);
    MemoryAppender logs;
    Level originalLevel;

    /* loaded from: input_file:com/datastax/driver/core/HeartbeatTest$QuerySubmitter.class */
    private static class QuerySubmitter implements Runnable {
        private final Session session;

        QuerySubmitter(Session session) {
            this.session = session;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                HeartbeatTest.logger.debug("Sending ping, for which we expect no response");
                this.session.executeAsync("ping");
                Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS);
            }
        }
    }

    @BeforeMethod(groups = {"long"})
    public void startCapturingLogs() {
        this.originalLevel = this.connectionLogger.getLevel();
        this.connectionLogger.setLevel(Level.DEBUG);
        this.logs = new MemoryAppender("%t - %m%n");
        this.connectionLogger.addAppender(this.logs);
    }

    @AfterMethod(groups = {"long"}, alwaysRun = true)
    public void stopCapturingLogs() {
        this.connectionLogger.setLevel(this.originalLevel);
        this.connectionLogger.removeAppender(this.logs);
    }

    private String getLog(Cluster cluster) {
        String[] split = this.logs.getNext().split("\\r?\\n");
        StringBuilder sb = new StringBuilder();
        for (String str : split) {
            if (str.startsWith(cluster.getClusterName() + "-nio-worker")) {
                sb.append(str);
                sb.append("\n");
            }
        }
        return sb.toString();
    }

    @Test(groups = {"long"})
    public void should_send_heartbeat_when_connection_is_inactive() throws InterruptedException {
        Cluster build = Cluster.builder().addContactPoints(new InetAddress[]{this.hostAddress.getAddress()}).withPort(this.scassandra.getBinaryPort()).withPoolingOptions(new PoolingOptions().setHeartbeatIntervalSeconds(3)).build();
        try {
            build.init();
            for (int i = 0; i < 5; i++) {
                triggerRequestOnControlConnection(build);
                TimeUnit.SECONDS.sleep(1L);
            }
            Assertions.assertThat(getLog(build)).doesNotContain("sending heartbeat");
            TimeUnit.SECONDS.sleep(4L);
            Assertions.assertThat(getLog(build)).contains(new CharSequence[]{"sending heartbeat"}).contains(new CharSequence[]{"heartbeat query succeeded"});
            TimeUnit.SECONDS.sleep(4L);
            Assertions.assertThat(getLog(build)).contains(new CharSequence[]{"sending heartbeat"}).contains(new CharSequence[]{"heartbeat query succeeded"});
            getLog(build);
            for (int i2 = 0; i2 < 5; i2++) {
                triggerRequestOnControlConnection(build);
                TimeUnit.SECONDS.sleep(1L);
            }
            Assertions.assertThat(getLog(build)).doesNotContain("sending heartbeat");
            TimeUnit.SECONDS.sleep(4L);
            Assertions.assertThat(getLog(build)).contains(new CharSequence[]{"sending heartbeat"}).contains(new CharSequence[]{"heartbeat query succeeded"});
        } finally {
            build.close();
        }
    }

    private void assertLineMatches(String str, Pattern pattern) {
        for (String str2 : str.split("\\r?\\n")) {
            if (pattern.matcher(str2).matches()) {
                return;
            }
        }
        org.assertj.core.api.Assertions.fail("Expecting: [" + str + "] to contain " + pattern);
    }

    private void assertNoLineMatches(String str, Pattern pattern) {
        for (String str2 : str.split("\\r?\\n")) {
            if (pattern.matcher(str2).matches()) {
                org.assertj.core.api.Assertions.fail("Expecting: [" + str + "] not to contain " + pattern);
            }
        }
    }

    @Test(groups = {"long"})
    public void should_send_heartbeat_when_requests_being_written_but_nothing_received() throws Exception {
        Cluster build = Cluster.builder().addContactPoints(new InetAddress[]{this.hostAddress.getAddress()}).withPort(this.scassandra.getBinaryPort()).withPoolingOptions(new PoolingOptions().setHeartbeatIntervalSeconds(3).setConnectionsPerHost(HostDistance.LOCAL, 1, 1)).build();
        this.scassandra.primingClient().prime(PrimingRequest.queryBuilder().withQuery("ping").withThen(PrimingRequest.then().withFixedDelay(8675309999L)));
        Thread thread = null;
        try {
            build.init();
            SessionManager connect = build.connect();
            Connection connection = (Connection) ((HostConnectionPool) connect.pools.get(TestUtils.findHost(build, 1))).connections.get(0);
            String replaceAll = connection.toString().replaceAll("\\-", "\\\\-").replaceAll("Connection\\[\\/", "").replaceAll("\\, inFlight.*", "");
            Pattern compile = Pattern.compile(".*" + replaceAll + ".*sending heartbeat");
            Pattern compile2 = Pattern.compile(".*" + replaceAll + ".*heartbeat query succeeded");
            logger.debug("Heartbeat pattern is {}", compile);
            thread = new Thread(new QuerySubmitter(connect));
            thread.start();
            for (int i = 0; i < 5; i++) {
                connect.execute("bar");
                TimeUnit.SECONDS.sleep(1L);
            }
            assertNoLineMatches(getLog(build), compile);
            int i2 = connection.inFlight.get();
            Assertions.assertThat(i2).isGreaterThan(0);
            TimeUnit.SECONDS.sleep(4L);
            Assertions.assertThat(connection.inFlight.get()).isGreaterThan(i2);
            String log = getLog(build);
            assertLineMatches(log, compile);
            assertLineMatches(log, compile2);
            if (thread != null) {
                thread.interrupt();
            }
            build.close();
        } catch (Throwable th) {
            if (thread != null) {
                thread.interrupt();
            }
            build.close();
            throw th;
        }
    }

    @Test(groups = {"long"})
    public void should_not_send_heartbeat_when_disabled() throws InterruptedException {
        Cluster build = Cluster.builder().addContactPoints(new InetAddress[]{this.hostAddress.getAddress()}).withPort(this.scassandra.getBinaryPort()).withPoolingOptions(new PoolingOptions().setHeartbeatIntervalSeconds(0)).build();
        try {
            build.init();
            for (int i = 0; i < 5; i++) {
                triggerRequestOnControlConnection(build);
                TimeUnit.SECONDS.sleep(1L);
            }
            Assertions.assertThat(getLog(build)).doesNotContain("sending heartbeat");
            TimeUnit.SECONDS.sleep(32L);
            Assertions.assertThat(getLog(build)).doesNotContain("sending heartbeat");
        } finally {
            build.close();
        }
    }

    private void triggerRequestOnControlConnection(Cluster cluster) {
        cluster.manager.controlConnection.refreshNodeInfo(TestUtils.findHost(cluster, 1));
    }
}
