package org.neo4j.causalclustering.catchup.storecopy;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.io.File;
import java.io.IOException;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.OpenOption;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.compress.utils.Charsets;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.CatchupAddressProvider;
import org.neo4j.causalclustering.catchup.CatchupAddressResolutionException;
import org.neo4j.causalclustering.catchup.CatchupClientBuilder;
import org.neo4j.causalclustering.catchup.CatchupServerBuilder;
import org.neo4j.causalclustering.catchup.CatchupServerProtocol;
import org.neo4j.causalclustering.catchup.ResponseMessageType;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyFinishedResponse;
import org.neo4j.causalclustering.helper.ConstantTimeTimeoutStrategy;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.net.Server;
import org.neo4j.collection.primitive.base.Empty;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.io.fs.DefaultFileSystemAbstraction;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PagedFile;
import org.neo4j.io.pagecache.impl.muninn.StandalonePageCacheFactory;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.DuplicatingLogProvider;
import org.neo4j.logging.FormattedLogProvider;
import org.neo4j.logging.Level;
import org.neo4j.logging.LogProvider;
import org.neo4j.ports.allocation.PortAuthority;
import org.neo4j.test.rule.TestDirectory;

/* loaded from: input_file:org/neo4j/causalclustering/catchup/storecopy/StoreCopyClientIT.class */
public class StoreCopyClientIT {
    private StoreCopyClient subject;
    private Server catchupServer;
    private TestCatchupServerHandler serverHandler;
    private FileSystemAbstraction fsa = new DefaultFileSystemAbstraction();
    private final AssertableLogProvider assertableLogProvider = new AssertableLogProvider(true);
    private final LogProvider logProvider = new DuplicatingLogProvider(new LogProvider[]{this.assertableLogProvider, FormattedLogProvider.withDefaultLogLevel(Level.DEBUG).toOutputStream(System.out)});
    private final TerminationCondition defaultTerminationCondition = TerminationCondition.CONTINUE_INDEFINITELY;

    @Rule
    public TestDirectory testDirectory = TestDirectory.testDirectory(this.fsa);
    private FakeFile fileA = new FakeFile("fileA", "This is file a content");
    private FakeFile fileB = new FakeFile("another-file-b", "Totally different content 123");
    private FakeFile indexFileA = new FakeFile("lucene", "Lucene 123");
    private File targetLocation = new File("copyTargetLocation");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/causalclustering/catchup/storecopy/StoreCopyClientIT$Once.class */
    public static class Once implements TerminationCondition {
        private Once() {
        }

        public void assertContinue() throws StoreCopyFailedException {
            throw new StoreCopyFailedException("One try only");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void writeContents(FileSystemAbstraction fileSystemAbstraction, File file, String str) {
        byte[] bytes = str.getBytes();
        try {
            StoreChannel create = fileSystemAbstraction.create(file);
            Throwable th = null;
            try {
                create.write(ByteBuffer.wrap(bytes));
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Before
    public void setup() throws Throwable {
        this.serverHandler = new TestCatchupServerHandler(this.logProvider, this.testDirectory, this.fsa);
        this.serverHandler.addFile(this.fileA);
        this.serverHandler.addFile(this.fileB);
        this.serverHandler.addIndexFile(this.indexFileA);
        writeContents(this.fsa, relative(this.fileA.getFilename()), this.fileA.getContent());
        writeContents(this.fsa, relative(this.fileB.getFilename()), this.fileB.getContent());
        writeContents(this.fsa, relative(this.indexFileA.getFilename()), this.indexFileA.getContent());
        this.catchupServer = new CatchupServerBuilder(this.serverHandler).listenAddress(new ListenSocketAddress("localhost", PortAuthority.allocatePort())).build();
        this.catchupServer.start();
        CatchUpClient build = new CatchupClientBuilder().build();
        build.start();
        this.subject = new StoreCopyClient(build, new Monitors(), this.logProvider, new ConstantTimeTimeoutStrategy(1L, TimeUnit.MILLISECONDS));
    }

    @After
    public void shutdown() throws Throwable {
        this.catchupServer.stop();
    }

    @Test
    public void canPerformCatchup() throws StoreCopyFailedException, IOException {
        InMemoryStoreStreamProvider inMemoryStoreStreamProvider = new InMemoryStoreStreamProvider();
        this.subject.copyStoreFiles(CatchupAddressProvider.fromSingleAddress(from(this.catchupServer.address().getPort())), this.serverHandler.getStoreId(), inMemoryStoreStreamProvider, () -> {
            return this.defaultTerminationCondition;
        }, this.targetLocation);
        Assert.assertEquals(new HashSet(Arrays.asList(this.fileA.getFilename(), this.fileB.getFilename(), this.indexFileA.getFilename())), inMemoryStoreStreamProvider.fileStreams().keySet());
        Assert.assertEquals(fileContent(relative(this.fileA.getFilename())), clientFileContents(inMemoryStoreStreamProvider, this.fileA.getFilename()));
        Assert.assertEquals(fileContent(relative(this.fileB.getFilename())), clientFileContents(inMemoryStoreStreamProvider, this.fileB.getFilename()));
    }

    @Test
    public void failedFileCopyShouldRetry() throws StoreCopyFailedException, IOException {
        this.fileB.setRemainingFailed(2);
        InMemoryStoreStreamProvider inMemoryStoreStreamProvider = new InMemoryStoreStreamProvider();
        this.subject.copyStoreFiles(CatchupAddressProvider.fromSingleAddress(from(this.catchupServer.address().getPort())), this.serverHandler.getStoreId(), inMemoryStoreStreamProvider, () -> {
            return this.defaultTerminationCondition;
        }, this.targetLocation);
        Assert.assertEquals(new HashSet(Arrays.asList(this.fileA.getFilename(), this.fileB.getFilename(), this.indexFileA.getFilename())), inMemoryStoreStreamProvider.fileStreams().keySet());
        Assert.assertEquals(fileContent(relative(this.fileA.getFilename())), clientFileContents(inMemoryStoreStreamProvider, this.fileA.getFilename()));
        Assert.assertEquals(fileContent(relative(this.fileB.getFilename())), clientFileContents(inMemoryStoreStreamProvider, this.fileB.getFilename()));
        Assert.assertEquals(3L, this.serverHandler.getRequestCount(this.fileB.getFilename()));
        Assert.assertEquals(1L, this.serverHandler.getRequestCount(this.fileA.getFilename()));
    }

    @Test
    public void shouldNotAppendToFileWhenRetryingWithNewFile() throws Throwable {
        final String str = "foo";
        final String str2 = "bar";
        final Iterator it = Iterators.iterator(new String[]{"abcd", "abcdefgh"});
        TestCatchupServerHandler testCatchupServerHandler = new TestCatchupServerHandler(this.logProvider, this.testDirectory, this.fsa) { // from class: org.neo4j.causalclustering.catchup.storecopy.StoreCopyClientIT.1
            @Override // org.neo4j.causalclustering.catchup.storecopy.TestCatchupServerHandler
            public ChannelHandler getStoreFileRequestHandler(final CatchupServerProtocol catchupServerProtocol) {
                return new SimpleChannelInboundHandler<GetStoreFileRequest>() { // from class: org.neo4j.causalclustering.catchup.storecopy.StoreCopyClientIT.1.1
                    /* JADX INFO: Access modifiers changed from: protected */
                    public void channelRead0(ChannelHandlerContext channelHandlerContext, GetStoreFileRequest getStoreFileRequest) throws IOException {
                        File file = new File(str);
                        String str3 = (String) it.next();
                        StoreCopyClientIT.writeContents(StoreCopyClientIT.this.fsa, file, str3);
                        PageCache neverSupportingFileOperationPageCache = StoreCopyClientIT.this.neverSupportingFileOperationPageCache(StandalonePageCacheFactory.createPageCache(StoreCopyClientIT.this.fsa));
                        PagedFile map = neverSupportingFileOperationPageCache.map(new File(str2), neverSupportingFileOperationPageCache.pageSize(), new OpenOption[]{StandardOpenOption.CREATE, StandardOpenOption.WRITE});
                        WritableByteChannel openWritableByteChannel = map.openWritableByteChannel();
                        Throwable th = null;
                        try {
                            try {
                                openWritableByteChannel.write(ByteBuffer.wrap(str3.getBytes(Charsets.UTF_8)));
                                if (openWritableByteChannel != null) {
                                    if (0 != 0) {
                                        try {
                                            openWritableByteChannel.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        openWritableByteChannel.close();
                                    }
                                }
                                sendFile(channelHandlerContext, file, neverSupportingFileOperationPageCache);
                                sendFile(channelHandlerContext, map.file(), neverSupportingFileOperationPageCache);
                                new StoreFileStreamingProtocol().end(channelHandlerContext, it.hasNext() ? StoreCopyFinishedResponse.Status.E_UNKNOWN : StoreCopyFinishedResponse.Status.SUCCESS);
                                catchupServerProtocol.expect(CatchupServerProtocol.State.MESSAGE_TYPE);
                            } finally {
                            }
                        } catch (Throwable th3) {
                            if (openWritableByteChannel != null) {
                                if (th != null) {
                                    try {
                                        openWritableByteChannel.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    openWritableByteChannel.close();
                                }
                            }
                            throw th3;
                        }
                    }

                    private void sendFile(ChannelHandlerContext channelHandlerContext, File file, PageCache pageCache) {
                        channelHandlerContext.write(ResponseMessageType.FILE);
                        channelHandlerContext.write(new FileHeader(file.getName()));
                        channelHandlerContext.writeAndFlush(new FileSender(new StoreResource(file, file.getName(), 16, pageCache, StoreCopyClientIT.this.fsa))).addListener(future -> {
                            StoreCopyClientIT.this.fsa.deleteFile(file);
                        });
                    }
                };
            }

            @Override // org.neo4j.causalclustering.catchup.storecopy.TestCatchupServerHandler
            public ChannelHandler storeListingRequestHandler(final CatchupServerProtocol catchupServerProtocol) {
                return new SimpleChannelInboundHandler<PrepareStoreCopyRequest>() { // from class: org.neo4j.causalclustering.catchup.storecopy.StoreCopyClientIT.1.2
                    /* JADX INFO: Access modifiers changed from: protected */
                    public void channelRead0(ChannelHandlerContext channelHandlerContext, PrepareStoreCopyRequest prepareStoreCopyRequest) {
                        channelHandlerContext.write(ResponseMessageType.PREPARE_STORE_COPY_RESPONSE);
                        channelHandlerContext.writeAndFlush(PrepareStoreCopyResponse.success(new File[]{new File(str)}, new Empty.EmptyPrimitiveLongSet(), 1L));
                        catchupServerProtocol.expect(CatchupServerProtocol.State.MESSAGE_TYPE);
                    }
                };
            }

            @Override // org.neo4j.causalclustering.catchup.storecopy.TestCatchupServerHandler
            public ChannelHandler getIndexSnapshotRequestHandler(CatchupServerProtocol catchupServerProtocol) {
                return new SimpleChannelInboundHandler<GetIndexFilesRequest>() { // from class: org.neo4j.causalclustering.catchup.storecopy.StoreCopyClientIT.1.3
                    /* JADX INFO: Access modifiers changed from: protected */
                    public void channelRead0(ChannelHandlerContext channelHandlerContext, GetIndexFilesRequest getIndexFilesRequest) {
                        throw new IllegalStateException("There should not be any index requests");
                    }
                };
            }
        };
        Server server = null;
        try {
            ListenSocketAddress listenSocketAddress = new ListenSocketAddress("localhost", PortAuthority.allocatePort());
            server = new CatchupServerBuilder(testCatchupServerHandler).listenAddress(listenSocketAddress).build();
            server.start();
            CatchupAddressProvider fromSingleAddress = CatchupAddressProvider.fromSingleAddress(new AdvertisedSocketAddress(listenSocketAddress.getHostname(), listenSocketAddress.getPort()));
            StoreId storeId = testCatchupServerHandler.getStoreId();
            File makeGraphDbDir = this.testDirectory.makeGraphDbDir();
            PageCache createPageCache = StandalonePageCacheFactory.createPageCache(this.fsa);
            this.subject.copyStoreFiles(fromSingleAddress, storeId, new StreamToDiskProvider(makeGraphDbDir, this.fsa, createPageCache, new Monitors()), () -> {
                return this.defaultTerminationCondition;
            }, this.targetLocation);
            Assert.assertEquals(fileContent(new File(makeGraphDbDir, "foo")), "abcdefgh");
            PagedFile map = createPageCache.map(new File(makeGraphDbDir, "bar"), createPageCache.pageSize(), new OpenOption[]{StandardOpenOption.READ});
            ByteBuffer wrap = ByteBuffer.wrap(new byte["abcdefgh".length()]);
            ReadableByteChannel openReadableByteChannel = map.openReadableByteChannel();
            Throwable th = null;
            try {
                try {
                    openReadableByteChannel.read(wrap);
                    if (openReadableByteChannel != null) {
                        if (0 != 0) {
                            try {
                                openReadableByteChannel.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            openReadableByteChannel.close();
                        }
                    }
                    Assert.assertEquals("abcdefgh", new String(wrap.array(), Charsets.UTF_8));
                    server.stop();
                    server.shutdown();
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            server.stop();
            server.shutdown();
            throw th3;
        }
    }

    @Test
    public void shouldLogConnetionRefusedMessage() {
        InMemoryStoreStreamProvider inMemoryStoreStreamProvider = new InMemoryStoreStreamProvider();
        final int allocatePort = PortAuthority.allocatePort();
        try {
            this.subject.copyStoreFiles(new CatchupAddressProvider() { // from class: org.neo4j.causalclustering.catchup.storecopy.StoreCopyClientIT.2
                public AdvertisedSocketAddress primary() {
                    return StoreCopyClientIT.from(StoreCopyClientIT.this.catchupServer.address().getPort());
                }

                public AdvertisedSocketAddress secondary() {
                    return new AdvertisedSocketAddress("localhost", allocatePort);
                }
            }, this.serverHandler.getStoreId(), inMemoryStoreStreamProvider, () -> {
                return new Once();
            }, this.targetLocation);
            Assert.fail();
        } catch (StoreCopyFailedException e) {
            this.assertableLogProvider.assertContainsExactlyOneMessageMatching(CoreMatchers.both(CoreMatchers.startsWith("Connection refused:")).and(CoreMatchers.containsString("localhost/127.0.0.1:" + allocatePort)));
        }
    }

    @Test
    public void shouldLogUpstreamIssueMessage() {
        InMemoryStoreStreamProvider inMemoryStoreStreamProvider = new InMemoryStoreStreamProvider();
        final CatchupAddressResolutionException catchupAddressResolutionException = new CatchupAddressResolutionException(new MemberId(UUID.randomUUID()));
        try {
            this.subject.copyStoreFiles(new CatchupAddressProvider() { // from class: org.neo4j.causalclustering.catchup.storecopy.StoreCopyClientIT.3
                public AdvertisedSocketAddress primary() {
                    return StoreCopyClientIT.from(StoreCopyClientIT.this.catchupServer.address().getPort());
                }

                public AdvertisedSocketAddress secondary() throws CatchupAddressResolutionException {
                    throw catchupAddressResolutionException;
                }
            }, this.serverHandler.getStoreId(), inMemoryStoreStreamProvider, () -> {
                return new Once();
            }, this.targetLocation);
            Assert.fail();
        } catch (StoreCopyFailedException e) {
            this.assertableLogProvider.assertContainsExactlyOneMessageMatching(CoreMatchers.startsWith("Unable to resolve address for"));
            this.assertableLogProvider.assertLogStringContains(catchupAddressResolutionException.getMessage());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PageCache neverSupportingFileOperationPageCache(PageCache pageCache) {
        PageCache pageCache2 = (PageCache) Mockito.spy(pageCache);
        Mockito.when(Boolean.valueOf(pageCache2.fileSystemSupportsFileOperations())).thenReturn(false);
        return pageCache2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static AdvertisedSocketAddress from(int i) {
        return new AdvertisedSocketAddress("localhost", i);
    }

    private File relative(String str) {
        return this.testDirectory.file(str);
    }

    private String fileContent(File file) throws IOException {
        return fileContent(file, this.fsa);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String fileContent(File file, FileSystemAbstraction fileSystemAbstraction) throws IOException {
        StringBuilder sb = new StringBuilder();
        Reader openAsReader = fileSystemAbstraction.openAsReader(file, Charsets.UTF_8);
        Throwable th = null;
        try {
            try {
                CharBuffer wrap = CharBuffer.wrap(new char[128]);
                while (openAsReader.read(wrap) != -1) {
                    wrap.flip();
                    sb.append((CharSequence) wrap);
                    wrap.clear();
                }
                if (openAsReader != null) {
                    if (0 != 0) {
                        try {
                            openAsReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        openAsReader.close();
                    }
                }
                return sb.toString();
            } finally {
            }
        } catch (Throwable th3) {
            if (openAsReader != null) {
                if (th != null) {
                    try {
                        openAsReader.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    openAsReader.close();
                }
            }
            throw th3;
        }
    }

    private String clientFileContents(InMemoryStoreStreamProvider inMemoryStoreStreamProvider, String str) {
        return inMemoryStoreStreamProvider.fileStreams().get(str).toString();
    }
}
