package net.openhft.chronicle.tcp;

import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import net.openhft.chronicle.Chronicle;
import net.openhft.chronicle.ChronicleQueueBuilder;
import net.openhft.chronicle.ExcerptTailer;
import net.openhft.chronicle.IndexedChronicle;
import net.openhft.chronicle.VanillaChronicle;
import net.openhft.lang.model.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/openhft/chronicle/tcp/SourceTcp.class */
public abstract class SourceTcp {
    protected final Logger logger;
    protected final String name;
    protected final ChronicleQueueBuilder.ReplicaChronicleQueueBuilder builder;
    protected final ThreadPoolExecutor executor;
    protected Object notifier = null;
    protected final AtomicBoolean running = new AtomicBoolean(false);

    /* loaded from: input_file:net/openhft/chronicle/tcp/SourceTcp$IndexedSessionHandler.class */
    private class IndexedSessionHandler extends SessionHandler {
        private long index;

        private IndexedSessionHandler(@NotNull SocketChannel socketChannel) {
            super(socketChannel);
            this.index = -1L;
        }

        @Override // net.openhft.chronicle.tcp.SourceTcp.SessionHandler
        protected boolean onSubscribe(SelectionKey selectionKey, long j) throws IOException {
            this.index = j;
            if (this.index == -1) {
                this.index = -1L;
            } else if (this.index == -2) {
                this.index = this.tailer.toEnd().index();
            }
            sendSizeAndIndex(ChronicleTcp.SYNC_IDX_LEN, this.index);
            selectionKey.interestOps(5);
            return true;
        }

        @Override // net.openhft.chronicle.tcp.SourceTcp.SessionHandler
        protected boolean write() throws IOException {
            if (!this.tailer.index(this.index)) {
                if (this.tailer.wasPadding()) {
                    if (this.index >= 0) {
                        sendSizeAndIndex(ChronicleTcp.PADDED_LEN, this.tailer.index());
                    }
                    this.index++;
                }
                pause();
                if (SourceTcp.this.running.get() && !this.tailer.index(this.index)) {
                    return false;
                }
            }
            pauseReset();
            long capacity = this.tailer.capacity();
            long j = capacity + 12;
            this.writeBuffer.clear();
            this.writeBuffer.putInt((int) capacity);
            this.writeBuffer.putLong(this.tailer.index());
            if (capacity > this.writeBuffer.capacity() / 2) {
                while (j > 0) {
                    this.writeBuffer.limit((int) Math.min(j, this.writeBuffer.capacity()));
                    this.tailer.read(this.writeBuffer);
                    this.writeBuffer.flip();
                    j -= this.writeBuffer.remaining();
                    this.connection.writeAll(this.writeBuffer);
                }
            } else {
                this.writeBuffer.limit((int) j);
                this.tailer.read(this.writeBuffer);
                int maxExcerptsPerMessage = SourceTcp.this.builder.maxExcerptsPerMessage();
                while (maxExcerptsPerMessage > 0 && this.tailer.index(this.index + 1)) {
                    if (!this.tailer.wasPadding()) {
                        if (!hasRoomForExcerpt(this.writeBuffer, this.tailer)) {
                            break;
                        }
                        int capacity2 = (int) this.tailer.capacity();
                        this.writeBuffer.limit(this.writeBuffer.position() + capacity2 + 12);
                        this.writeBuffer.putInt(capacity2);
                        this.writeBuffer.putLong(this.tailer.index());
                        this.tailer.read(this.writeBuffer);
                        this.index++;
                        maxExcerptsPerMessage--;
                    } else {
                        this.index++;
                    }
                }
                this.writeBuffer.flip();
                this.connection.writeAll(this.writeBuffer);
            }
            if (this.writeBuffer.remaining() > 0) {
                throw new EOFException("Failed to send index=" + this.index);
            }
            this.index++;
            return true;
        }
    }

    /* loaded from: input_file:net/openhft/chronicle/tcp/SourceTcp$SessionHandler.class */
    private abstract class SessionHandler implements Runnable, Closeable {
        private final SocketChannel socketChannel;
        private long lastUnpausedNS;
        private long pauseWait;
        protected final TcpConnection connection;
        protected ExcerptTailer tailer;
        protected long lastHeartbeat;
        protected final ByteBuffer writeBuffer;
        protected final ByteBuffer readBuffer;

        private SessionHandler(@NotNull SocketChannel socketChannel) {
            this.socketChannel = socketChannel;
            this.connection = new TcpConnection(socketChannel);
            this.tailer = null;
            this.lastHeartbeat = 0L;
            this.lastUnpausedNS = 0L;
            this.pauseWait = SourceTcp.this.builder.heartbeatIntervalUnit().toMillis(SourceTcp.this.builder.heartbeatInterval()) / 2;
            this.readBuffer = ChronicleTcp.createBufferOfSize(16);
            this.writeBuffer = ChronicleTcp.createBuffer(SourceTcp.this.builder.minBufferSize());
            this.readBuffer.clear();
            this.readBuffer.limit(16);
            this.writeBuffer.clear();
            this.writeBuffer.limit(0);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.tailer != null) {
                this.tailer.close();
                this.tailer = null;
            }
            if (this.socketChannel.isOpen()) {
                this.socketChannel.close();
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            SourceTcp.this.builder.selectorSpinLoopCount();
            SourceTcp.this.builder.selectTimeout();
            VanillaSelectionKeySet vanillaSelectionKeySet = null;
            try {
                try {
                    this.socketChannel.configureBlocking(false);
                    this.socketChannel.socket().setSendBufferSize(SourceTcp.this.builder.minBufferSize());
                    this.socketChannel.socket().setTcpNoDelay(true);
                    this.socketChannel.socket().setSoTimeout(0);
                    this.socketChannel.socket().setSoLinger(false, 0);
                    VanillaSelector register = new VanillaSelector().open().register(this.socketChannel, 1);
                    this.tailer = SourceTcp.this.builder.chronicle().createTailer();
                    VanillaSelectionKeySet vanillaSelectionKeys = register.vanillaSelectionKeys();
                    if (vanillaSelectionKeys != null) {
                        vanillaNioLoop(register, vanillaSelectionKeys);
                    } else {
                        nioLoop(register);
                    }
                    if (vanillaSelectionKeys != null) {
                        vanillaSelectionKeys.cleanup();
                    }
                } catch (EOFException e) {
                    if (SourceTcp.this.running.get()) {
                        SourceTcp.this.logger.info("Connection {} died", this.socketChannel);
                    }
                    if (0 != 0) {
                        vanillaSelectionKeySet.cleanup();
                    }
                } catch (Exception e2) {
                    if (SourceTcp.this.running.get()) {
                        String message = e2.getMessage();
                        if (message == null || !(message.contains("reset by peer") || message.contains("Broken pipe") || message.contains("was aborted by"))) {
                            SourceTcp.this.logger.info("Connection {} died", this.socketChannel, e2);
                        } else {
                            SourceTcp.this.logger.info("Connection {} closed from the other end: ", this.socketChannel, e2.getMessage());
                        }
                    }
                    if (0 != 0) {
                        vanillaSelectionKeySet.cleanup();
                    }
                }
                try {
                    close();
                } catch (IOException e3) {
                    SourceTcp.this.logger.warn("", e3);
                }
            } catch (Throwable th) {
                if (0 != 0) {
                    vanillaSelectionKeySet.cleanup();
                }
                throw th;
            }
        }

        private void vanillaNioLoop(VanillaSelector vanillaSelector, VanillaSelectionKeySet vanillaSelectionKeySet) throws IOException {
            SelectionKey selectionKey;
            int selectorSpinLoopCount = SourceTcp.this.builder.selectorSpinLoopCount();
            long selectTimeout = SourceTcp.this.builder.selectTimeout();
            while (SourceTcp.this.running.get()) {
                if (vanillaSelector.select(selectorSpinLoopCount, selectTimeout) > 0) {
                    SelectionKey[] flip = vanillaSelectionKeySet.flip();
                    for (int i = 0; i < flip.length && flip[i] != null && ((selectionKey = flip[i]) == null || onSelectionKey(selectionKey)); i++) {
                    }
                    vanillaSelectionKeySet.cleanup(flip);
                }
            }
        }

        private void nioLoop(VanillaSelector vanillaSelector) throws IOException {
            int selectorSpinLoopCount = SourceTcp.this.builder.selectorSpinLoopCount();
            long selectTimeout = SourceTcp.this.builder.selectTimeout();
            while (SourceTcp.this.running.get()) {
                if (vanillaSelector.select(selectorSpinLoopCount, selectTimeout) > 0) {
                    Set<SelectionKey> selectionKeys = vanillaSelector.selectionKeys();
                    Iterator<SelectionKey> it = selectionKeys.iterator();
                    while (it.hasNext() && onSelectionKey(it.next())) {
                    }
                    selectionKeys.clear();
                }
            }
        }

        protected boolean hasRoomForExcerpt(ByteBuffer byteBuffer, ExcerptTailer excerptTailer) {
            return excerptTailer.capacity() + 12 < ((long) (byteBuffer.capacity() - byteBuffer.position()));
        }

        protected void pauseReset() {
            this.lastUnpausedNS = System.nanoTime();
        }

        protected void pause() {
            if (this.lastUnpausedNS + ChronicleTcp.BUSY_WAIT_TIME_NS > System.nanoTime()) {
                return;
            }
            try {
                synchronized (SourceTcp.this.notifier) {
                    SourceTcp.this.notifier.wait(this.pauseWait);
                }
            } catch (InterruptedException e) {
            }
        }

        protected void setLastHeartbeat() {
            this.lastHeartbeat = System.currentTimeMillis() + SourceTcp.this.builder.heartbeatIntervalMillis();
        }

        protected void setLastHeartbeat(long j) {
            this.lastHeartbeat = j + SourceTcp.this.builder.heartbeatIntervalMillis();
        }

        protected void sendSizeAndIndex(int i, long j) throws IOException {
            this.writeBuffer.clear();
            this.writeBuffer.putInt(i);
            this.writeBuffer.putLong(j);
            this.writeBuffer.flip();
            this.connection.writeAllOrEOF(this.writeBuffer);
            setLastHeartbeat();
        }

        protected boolean onSelectionKey(SelectionKey selectionKey) throws IOException {
            if (selectionKey != null) {
                return selectionKey.isReadable() ? onRead(selectionKey) : !selectionKey.isWritable() || onWrite(selectionKey);
            }
            return true;
        }

        protected boolean onRead(SelectionKey selectionKey) throws IOException {
            try {
                this.readBuffer.clear();
                this.connection.readFullyOrEOF(this.readBuffer);
                this.readBuffer.flip();
                long j = this.readBuffer.getLong();
                long j2 = this.readBuffer.getLong();
                if (j == 1) {
                    return onSubscribe(selectionKey, j2);
                }
                if (j == 2) {
                    return onQuery(selectionKey, j2);
                }
                throw new IOException("Unknown action received (" + j + ")");
            } catch (EOFException e) {
                selectionKey.selector().close();
                throw e;
            }
        }

        protected boolean onWrite(SelectionKey selectionKey) throws IOException {
            long currentTimeMillis = System.currentTimeMillis();
            if (!SourceTcp.this.running.get() || write() || this.lastHeartbeat > currentTimeMillis) {
                return true;
            }
            sendSizeAndIndex(ChronicleTcp.IN_SYNC_LEN, 0L);
            return true;
        }

        protected boolean onQuery(SelectionKey selectionKey, long j) throws IOException {
            if (!this.tailer.index(j)) {
                sendSizeAndIndex(ChronicleTcp.IN_SYNC_LEN, 0L);
                return true;
            }
            long currentTimeMillis = System.currentTimeMillis();
            setLastHeartbeat(currentTimeMillis);
            while (!this.tailer.nextIndex()) {
                if (this.lastHeartbeat <= currentTimeMillis) {
                    sendSizeAndIndex(ChronicleTcp.IN_SYNC_LEN, 0L);
                    return true;
                }
            }
            sendSizeAndIndex(ChronicleTcp.SYNC_IDX_LEN, this.tailer.index());
            return true;
        }

        protected abstract boolean onSubscribe(SelectionKey selectionKey, long j) throws IOException;

        protected abstract boolean write() throws IOException;
    }

    /* loaded from: input_file:net/openhft/chronicle/tcp/SourceTcp$VanillaSessionHandler.class */
    private class VanillaSessionHandler extends SessionHandler {
        private boolean nextIndex;
        private long index;

        private VanillaSessionHandler(@NotNull SocketChannel socketChannel) {
            super(socketChannel);
            this.nextIndex = true;
            this.index = -1L;
        }

        @Override // net.openhft.chronicle.tcp.SourceTcp.SessionHandler
        protected boolean onSubscribe(SelectionKey selectionKey, long j) throws IOException {
            this.index = j;
            if (this.index == -1) {
                this.nextIndex = true;
                this.tailer = this.tailer.toStart();
                this.index = -1L;
            } else if (this.index == -2) {
                this.nextIndex = false;
                this.tailer = this.tailer.toEnd();
                this.index = this.tailer.index();
                if (this.index == -1) {
                    this.nextIndex = true;
                    this.tailer = this.tailer.toStart();
                    this.index = -1L;
                }
            } else {
                this.nextIndex = false;
            }
            sendSizeAndIndex(ChronicleTcp.SYNC_IDX_LEN, this.index);
            selectionKey.interestOps(5);
            return false;
        }

        @Override // net.openhft.chronicle.tcp.SourceTcp.SessionHandler
        protected boolean write() throws IOException {
            if (!this.nextIndex) {
                if (!this.tailer.index(this.index)) {
                    return false;
                }
                this.nextIndex = true;
            } else if (!this.tailer.nextIndex()) {
                pause();
                if (SourceTcp.this.running.get() && !this.tailer.nextIndex()) {
                    return false;
                }
            }
            pauseReset();
            long capacity = this.tailer.capacity();
            long j = capacity + 12;
            this.writeBuffer.clear();
            this.writeBuffer.putInt((int) capacity);
            this.writeBuffer.putLong(this.tailer.index());
            if (capacity > this.writeBuffer.capacity() / 2) {
                while (j > 0) {
                    this.writeBuffer.limit((int) Math.min(j, this.writeBuffer.capacity()));
                    this.tailer.read(this.writeBuffer);
                    this.writeBuffer.flip();
                    j -= this.writeBuffer.remaining();
                    this.connection.writeAll(this.writeBuffer);
                }
            } else {
                this.writeBuffer.limit((int) j);
                this.tailer.read(this.writeBuffer);
                for (int maxExcerptsPerMessage = SourceTcp.this.builder.maxExcerptsPerMessage(); maxExcerptsPerMessage > 0 && this.tailer.nextIndex(); maxExcerptsPerMessage--) {
                    if (this.tailer.wasPadding()) {
                        throw new AssertionError("Entry should not be padding - remove");
                    }
                    if (!hasRoomForExcerpt(this.writeBuffer, this.tailer)) {
                        break;
                    }
                    int capacity2 = (int) this.tailer.capacity();
                    this.writeBuffer.limit(this.writeBuffer.position() + capacity2 + 12);
                    this.writeBuffer.putInt(capacity2);
                    this.writeBuffer.putLong(this.tailer.index());
                    this.tailer.read(this.writeBuffer);
                }
                this.writeBuffer.flip();
                this.connection.writeAll(this.writeBuffer);
            }
            if (this.writeBuffer.remaining() > 0) {
                throw new EOFException("Failed to send index=" + this.tailer.index());
            }
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SourceTcp(String str, ChronicleQueueBuilder.ReplicaChronicleQueueBuilder replicaChronicleQueueBuilder, ThreadPoolExecutor threadPoolExecutor) {
        this.builder = replicaChronicleQueueBuilder;
        this.name = ChronicleTcp.connectionName(str, this.builder.bindAddress(), this.builder.connectAddress());
        this.logger = LoggerFactory.getLogger(this.name);
        this.executor = threadPoolExecutor;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifier(Object obj) {
        this.notifier = obj;
    }

    Object notifier() {
        return this.notifier;
    }

    public boolean open() {
        this.running.set(true);
        this.executor.execute(createHandler());
        return this.running.get();
    }

    public boolean close() {
        this.running.set(false);
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(this.builder.selectTimeout() * 2, this.builder.selectTimeoutUnit());
        } catch (InterruptedException e) {
        }
        return !this.running.get();
    }

    public String toString() {
        return this.name;
    }

    protected abstract Runnable createHandler();

    /* JADX INFO: Access modifiers changed from: protected */
    public Runnable createSessionHandler(@NotNull SocketChannel socketChannel) {
        Chronicle chronicle = this.builder.chronicle();
        if (chronicle == null) {
            throw new IllegalStateException("Chronicle can't be null");
        }
        if (chronicle instanceof IndexedChronicle) {
            return new IndexedSessionHandler(socketChannel);
        }
        if (chronicle instanceof VanillaChronicle) {
            return new VanillaSessionHandler(socketChannel);
        }
        throw new IllegalStateException("Chronicle must be Indexed or Vanilla");
    }
}
