package com.baidu.bigpipe.transport.pub;

import com.baidu.bigpipe.protocol.meta.concept.TopicAddress;
import com.baidu.bigpipe.transport.AbstractSessionSocketStream;
import com.baidu.bigpipe.transport.PipeRuntime;
import com.baidu.bigpipe.transport.PipeletInfo;
import com.baidu.bigpipe.transport.Receiver;
import com.baidu.bigpipe.transport.SessionSocketStream;
import com.baidu.bigpipe.transport.TransportStrategy;
import com.baidu.bigpipe.transport.conf.SocketConf;
import com.baidu.bigpipe.transport.pub.AbstractNioSession;
import com.baidu.bigpipe.transport.pub.context.ReadContext;
import com.baidu.bigpipe.transport.pub.context.ReadState;
import com.baidu.bigpipe.transport.pub.context.WriteState;
import com.baidu.bigpipe.transport.pub.context.WriteTask;
import java.io.IOException;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/baidu/bigpipe/transport/pub/AsynchronousPublisherImpl.class */
public class AsynchronousPublisherImpl extends AbstractNioPublisher implements AsynchronousPublisher {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsynchronousPublisherImpl.class);

    @Override // com.baidu.bigpipe.transport.pub.AbstractNioSession
    protected void handleSelectorException(SelectionKey selectionKey) {
        this.sessionRuntime.hasError = true;
        super.safeCloseTcpConnect();
        this.pubStrategy.fastFailed(this.pipeRuntime.getSessionIdProvider());
        this.sessionRuntime.needOpenTcp = true;
        this.sessionRuntime.hasError = false;
    }

    @Override // com.baidu.bigpipe.transport.pub.AbstractNioSession
    protected WriteState write(SelectionKey selectionKey) {
        AbstractNioSession.AttachHolder attachHolder = (AbstractNioSession.AttachHolder) selectionKey.attachment();
        if (attachHolder == null || attachHolder.writeTask == null) {
            return WriteState.NoTask;
        }
        try {
            ((SocketChannel) selectionKey.channel()).write(attachHolder.writeTask.getBuf());
            if (attachHolder.writeTask.isReConnectAndNoSend()) {
                attachHolder.writeTask.setReConnectAndNoSend(false);
            }
            if (attachHolder.writeTask.getBuf().remaining() != 0) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("writing for logid {},session id is {} ", attachHolder.writeTask.getLogId(), Long.valueOf(attachHolder.writeTask.getSessionMessageId()));
                }
                return WriteState.Writing;
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("write finish for logid {},session id is {} ", attachHolder.writeTask.getLogId(), Long.valueOf(attachHolder.writeTask.getSessionMessageId()));
            }
            attachHolder.writeTask = null;
            return WriteState.Finish;
        } catch (IOException e) {
            LOGGER.info("write", e);
            handleSelectorException(selectionKey);
            return WriteState.Error;
        }
    }

    @Override // com.baidu.bigpipe.transport.pub.AbstractNioSession
    protected ReadState read(SelectionKey selectionKey) {
        AbstractNioSession.AttachHolder attachHolder = (AbstractNioSession.AttachHolder) selectionKey.attachment();
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        if (attachHolder.rc == null) {
            attachHolder.rc = new ReadContext();
        }
        try {
            this.reciever.recieve(socketChannel, attachHolder.rc);
            if (attachHolder.rc.isEOF()) {
                LOGGER.info("recieve data with eof!!!");
                handleFastFailed(true);
                attachHolder.rc = null;
                attachHolder.writeTask = null;
                return ReadState.ReadEOF;
            }
            if (!attachHolder.rc.isComplete()) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("reading for logid {},session id is {} ", attachHolder.writeTask.getLogId(), Long.valueOf(attachHolder.writeTask.getSessionMessageId()));
                }
                return ReadState.Reading;
            }
            attachHolder.rc.getBuf().flip();
            this.pubStrategy.finishPub(attachHolder.rc.getBuf(), this.pipeRuntime.getSessionIdProvider());
            attachHolder.rc = null;
            return ReadState.Finish;
        } catch (IOException e) {
            LOGGER.info("read", e);
            handleFastFailed(true);
            attachHolder.rc = null;
            attachHolder.writeTask = null;
            return ReadState.Error;
        }
    }

    @Override // com.baidu.bigpipe.transport.pub.AbstractNioSession
    protected WriteTask startNewTask() {
        long sessionMessageId = this.pipeRuntime.getSessionMessageId() + 1;
        WriteTask nextTask = this.pubStrategy.getNextTask(this.idGen, sessionMessageId, this.pipeRuntime.getSessionIdProvider().getSessionId(false), this.pipeRuntime.getTopicName());
        if (nextTask == null) {
            return null;
        }
        if (nextTask.getCount() == 0) {
            this.pipeRuntime.setSessionMessageId(sessionMessageId);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("new task messageid {}", Long.valueOf(nextTask.getSessionMessageId()));
            }
        } else if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("repeat task messageid {}", Long.valueOf(nextTask.getSessionMessageId()));
        }
        return nextTask;
    }

    @Override // com.baidu.bigpipe.transport.pub.AbstractNioSession
    protected void ensureTcp() {
        buildConnect(false, this.socketConf);
    }

    @Override // com.baidu.bigpipe.transport.pub.AbstractNioSession
    protected void configTask(WriteTask writeTask, boolean z) {
        if (z || this.pubStrategy.getCurrentTaskCount() != 1) {
            return;
        }
        writeTask.setReConnectAndNoSend(true);
    }

    @Override // com.baidu.bigpipe.transport.BigpipeSessionSupport
    protected void handleFastFailed(boolean z) {
        if (z) {
            this.sessionRuntime.hasError = true;
        }
        super.safeCloseTcpConnect();
        this.pubStrategy.fastFailed(this.pipeRuntime.getSessionIdProvider());
        if (z) {
            this.sessionRuntime.hasError = false;
        }
    }

    @Override // com.baidu.bigpipe.transport.pub.AbstractNioSession
    protected void handleTimeout() {
        if (this.pubStrategy.handlePubTimeout(this.pipeRuntime.getSessionIdProvider())) {
            LOGGER.info("send time out, and close socket....");
            safeCloseTcpConnect();
            this.sessionRuntime.needOpenTcp = true;
        }
    }

    @Override // com.baidu.bigpipe.transport.BigpipeSessionSupport
    protected SessionSocketStream openStream(TopicAddress topicAddress, SocketConf socketConf) throws IOException {
        this.tcpConnect = SocketChannel.open();
        LOGGER.warn("stripe address--[{}:{}]", topicAddress.getAddress().getHostName(), Integer.valueOf(topicAddress.getAddress().getPort()));
        this.tcpConnect.configureBlocking(true);
        this.tcpConnect.socket().connect(topicAddress.getAddress(), socketConf.getConectTimeout());
        this.tcpConnect.socket().setSoTimeout(socketConf.getIoTimeout());
        return new AbstractSessionSocketStream() { // from class: com.baidu.bigpipe.transport.pub.AsynchronousPublisherImpl.1
            @Override // com.baidu.bigpipe.transport.AbstractSessionSocketStream, com.baidu.bigpipe.transport.SessionSocketStream
            public void connectSession(PipeRuntime pipeRuntime, PipeletInfo pipeletInfo) throws IOException {
                super.connectSession(pipeRuntime, pipeletInfo);
                AsynchronousPublisherImpl.this.tcpConnect.configureBlocking(false);
                AsynchronousPublisherImpl.LOGGER.info("register socket to nio. " + pipeletInfo.getPipeletName());
                AsynchronousPublisherImpl.this.tcpConnect.register(AsynchronousPublisherImpl.this.selector, 1).attach(new AbstractNioSession.AttachHolder());
                AsynchronousPublisherImpl.this.sessionRuntime.waitTask = true;
            }

            @Override // com.baidu.bigpipe.transport.AbstractSessionSocketStream
            protected Socket buildSocketIfNotExist() {
                return AsynchronousPublisherImpl.this.tcpConnect.socket();
            }

            @Override // com.baidu.bigpipe.transport.AbstractSessionSocketStream
            protected TransportStrategy getTransStrategy() {
                return AsynchronousPublisherImpl.this.pubStrategy;
            }

            @Override // com.baidu.bigpipe.transport.AbstractSessionSocketStream
            protected Receiver getReceiver() {
                return AsynchronousPublisherImpl.this.reciever;
            }

            @Override // com.baidu.bigpipe.transport.AbstractSessionSocketStream
            protected int getRole() {
                return 4;
            }

            @Override // com.baidu.bigpipe.transport.AbstractSessionSocketStream
            protected void afterCreateSession() throws IOException {
            }
        };
    }
}
