package com.baidu.bigpipe.transport.sub;

import com.baidu.bigpipe.driver.converter.sub.MessageBodyConverter;
import com.baidu.bigpipe.position.store.SubcribePositionStore;
import com.baidu.bigpipe.protocol.BigpipePacket;
import com.baidu.bigpipe.protocol.meta.NameService;
import com.baidu.bigpipe.protocol.meta.concept.TopicAddress;
import com.baidu.bigpipe.protocol.meta.exp.NameResolveException;
import com.baidu.bigpipe.protocol.pb.BigpipePBProtocol;
import com.baidu.bigpipe.transport.AbstractSessionSocketStream;
import com.baidu.bigpipe.transport.BigpipeSessionSupport;
import com.baidu.bigpipe.transport.NHeadTransportStrategy;
import com.baidu.bigpipe.transport.Receiver;
import com.baidu.bigpipe.transport.SessionSocketStream;
import com.baidu.bigpipe.transport.TransportStrategy;
import com.baidu.bigpipe.transport.conf.BigPipeConf;
import com.baidu.bigpipe.transport.conf.SocketConf;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/baidu/bigpipe/transport/sub/AsynchronousSubscriberBioImpl.class */
public class AsynchronousSubscriberBioImpl extends BigpipeSessionSupport implements AsynchronousSubscriber {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsynchronousSubscriberBioImpl.class);
    private volatile Socket socket;
    private TransportStrategy transStrategy = new NHeadTransportStrategy();
    private BigpipeMessageListener messageListener;
    private MessageBodyConverter bodyConverter;
    private SubcribePositionStore positionStore;
    private BigPipeConf bigPipeConf;
    private volatile long startPoint;

    public Receiver getReciever() {
        return this.reciever;
    }

    public void setReciever(Receiver receiver) {
        this.reciever = receiver;
    }

    public TransportStrategy getTransStrategy() {
        return this.transStrategy;
    }

    public void setTransStrategy(TransportStrategy transportStrategy) {
        this.transStrategy = transportStrategy;
    }

    public BigpipeMessageListener getMessageListener() {
        return this.messageListener;
    }

    public void setMessageListener(BigpipeMessageListener bigpipeMessageListener) {
        this.messageListener = bigpipeMessageListener;
    }

    public MessageBodyConverter getBodyConverter() {
        return this.bodyConverter;
    }

    public void setBodyConverter(MessageBodyConverter messageBodyConverter) {
        this.bodyConverter = messageBodyConverter;
    }

    public SubcribePositionStore getPositionStore() {
        return this.positionStore;
    }

    public void setPositionStore(SubcribePositionStore subcribePositionStore) {
        this.positionStore = subcribePositionStore;
    }

    @Override // com.baidu.bigpipe.transport.sub.AsynchronousSubscriber
    public void startSubscribe(BigpipeMessageListener bigpipeMessageListener, BigPipeConf bigPipeConf) {
        this.messageListener = bigpipeMessageListener;
        init(bigPipeConf);
    }

    @Override // com.baidu.bigpipe.transport.sub.AsynchronousSubscriber
    public void shutDown() {
        this.lifeController.setShutDown(true);
        try {
            safeCloseTcpConnect();
            this.lifeController.getShutDownWait().await(this.bigPipeConf.getShutDownTimeout(), TimeUnit.MINUTES);
        } catch (InterruptedException e) {
        }
    }

    @Override // com.baidu.bigpipe.transport.BigpipeSessionSupport
    protected void continueConfig(BigPipeConf bigPipeConf) {
        this.bigPipeConf = bigPipeConf;
        try {
            ensureStartPoint(initStartPoint(bigPipeConf));
        } catch (KeeperException e) {
            throw new RuntimeException((Throwable) e);
        } catch (NameResolveException e2) {
            throw new RuntimeException(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void threadCoreFunction() {
        while (!this.lifeController.isShutDown()) {
            if (this.socket == null) {
                buildConnect(false, this.bigPipeConf);
                if (this.lifeController.isShutDown()) {
                    this.lifeController.getShutDownWait().countDown();
                    return;
                }
            }
            doSubscribeMessage();
        }
        this.lifeController.getShutDownWait().countDown();
    }

    private void doSubscribeMessage() {
        try {
            ByteBuffer blockRecieve = this.reciever.blockRecieve(this.socket);
            if (blockRecieve == null) {
                LOGGER.info("read data from bigpipe failed, resubscribe.");
                safeCloseTcpConnect();
                return;
            }
            long[] jArr = {0};
            BigpipePBProtocol.MessageCommand[] messageCommandArr = {null};
            List<Object> extract = extract(blockRecieve, jArr, messageCommandArr);
            if (messageCommandArr[0] == null) {
                safeCloseTcpConnect();
                return;
            }
            if (extract.size() > 0) {
                this.messageListener.handle(extract, jArr[0]);
                this.startPoint = jArr[0] + 1;
            } else {
                LOGGER.info("recieve null message.");
            }
            continueNextMessage(messageCommandArr[0]);
        } catch (IOException e) {
            LOGGER.error("read from bigpipe failed,unkown io error, resubscribe.", e);
            safeCloseTcpConnect();
        } catch (RuntimeException e2) {
            LOGGER.error("read from bigpipe failed,unkown error, resubscribe.", e2);
            safeCloseTcpConnect();
        } catch (SocketTimeoutException e3) {
            LOGGER.error("read timeout from bigpipe failed, resubscribe.", e3);
            safeCloseTcpConnect();
        } catch (Exception e4) {
            LOGGER.error("read from bigpipe failed,unkown error, resubscribe.", e4);
            safeCloseTcpConnect();
        }
    }

    private void continueNextMessage(BigpipePBProtocol.MessageCommand messageCommand) {
        try {
            sendByteBufferData2Socket(this.socket, this.transStrategy.buildSimpleCommand(buildResponseMessage(messageCommand).toByteArray()));
        } catch (IOException e) {
            LOGGER.error("write to bigpipe failed,unkown error, resubscribe.", e);
            safeCloseTcpConnect();
        }
    }

    private BigpipePBProtocol.BigpipeCommand buildResponseMessage(BigpipePBProtocol.MessageCommand messageCommand) {
        BigpipePBProtocol.BigpipeCommand.Builder newBuilder = BigpipePBProtocol.BigpipeCommand.newBuilder();
        newBuilder.setType(BigpipePBProtocol.BigpipeCommand.CommandType.BMQ_ACK);
        BigpipePBProtocol.AckCommand.Builder ackBuilder = newBuilder.getAckBuilder();
        ackBuilder.setDestination(messageCommand.getDestination());
        ackBuilder.setAckType(1);
        ackBuilder.setTopicMessageId(messageCommand.getTopicMessageId());
        ackBuilder.setReceiptId(messageCommand.getReceiptId());
        return newBuilder.build();
    }

    private List<Object> extract(ByteBuffer byteBuffer, long[] jArr, BigpipePBProtocol.MessageCommand[] messageCommandArr) {
        LinkedList linkedList = new LinkedList();
        byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
        byte[] bArr = new byte[byteBuffer.getInt()];
        byteBuffer.get(bArr);
        try {
            BigpipePBProtocol.BigpipeCommand parseFrom = BigpipePBProtocol.BigpipeCommand.parseFrom(bArr);
            messageCommandArr[0] = parseFrom.getMessage();
            if (parseFrom.getType() != BigpipePBProtocol.BigpipeCommand.CommandType.BMQ_MESSAGE) {
                LOGGER.error(parseFrom.getError().toString());
                return linkedList;
            }
            jArr[0] = parseFrom.getMessage().getTopicMessageId();
            LOGGER.info("body size:" + byteBuffer.getInt());
            int i = 0;
            while (byteBuffer.remaining() > 4) {
                int i2 = byteBuffer.getInt();
                if (i2 > byteBuffer.remaining()) {
                    i2 = byteBuffer.remaining();
                }
                byte[] bArr2 = new byte[i2];
                byteBuffer.get(bArr2);
                linkedList.add(this.bodyConverter.bin2Object(bArr2));
                i++;
            }
            return linkedList;
        } catch (InvalidProtocolBufferException e) {
            LOGGER.error("BigpipeCommand.parseFrom error.", e);
            return linkedList;
        }
    }

    /* JADX WARN: String concatenation convert failed
    jadx.core.utils.exceptions.JadxRuntimeException: Can't remove SSA var: r9v0 java.lang.String, still in use, count: 1, list:
      (r9v0 java.lang.String) from STR_CONCAT 
      (r9v0 java.lang.String)
      (wrap:java.lang.String:0x002c: INVOKE (r7v0 com.baidu.bigpipe.transport.conf.BigPipeConf) VIRTUAL call: com.baidu.bigpipe.transport.conf.BigPipeConf.getThreadName():java.lang.String A[MD:():java.lang.String (m), WRAPPED])
      ("-")
     A[MD:():java.lang.String (c), SYNTHETIC, WRAPPED]
    	at jadx.core.utils.InsnRemover.removeSsaVar(InsnRemover.java:151)
    	at jadx.core.utils.InsnRemover.unbindResult(InsnRemover.java:116)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:80)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.dex.visitors.SimplifyVisitor.removeStringBuilderInsns(SimplifyVisitor.java:495)
    	at jadx.core.dex.visitors.SimplifyVisitor.convertStringBuilderChain(SimplifyVisitor.java:422)
    	at jadx.core.dex.visitors.SimplifyVisitor.convertInvoke(SimplifyVisitor.java:314)
    	at jadx.core.dex.visitors.SimplifyVisitor.simplifyInsn(SimplifyVisitor.java:145)
    	at jadx.core.dex.visitors.SimplifyVisitor.simplifyArgs(SimplifyVisitor.java:114)
    	at jadx.core.dex.visitors.SimplifyVisitor.simplifyInsn(SimplifyVisitor.java:132)
    	at jadx.core.dex.visitors.SimplifyVisitor.simplifyBlock(SimplifyVisitor.java:86)
    	at jadx.core.dex.visitors.SimplifyVisitor.visit(SimplifyVisitor.java:71)
     */
    @Override // com.baidu.bigpipe.transport.BigpipeSessionSupport
    public void start(BigPipeConf bigPipeConf) {
        String str;
        buildConnect(true, bigPipeConf);
        Thread thread = new Thread(new Runnable() { // from class: com.baidu.bigpipe.transport.sub.AsynchronousSubscriberBioImpl.1
            @Override // java.lang.Runnable
            public void run() {
                AsynchronousSubscriberBioImpl.this.threadCoreFunction();
            }
        });
        thread.setName(new StringBuilder().append(bigPipeConf.getThreadName() != null ? str + bigPipeConf.getThreadName() + "-" : "bigpipe-client-thread-").append(thread.getId()).toString());
        thread.start();
        try {
            this.lifeController.getThreadRunning().await();
        } catch (InterruptedException e) {
            LOGGER.error("start thread error.", e);
            throw new RuntimeException(e);
        }
    }

    private long convertPositionFromFixedStartPoint(long j) {
        if (j == -1) {
            return Long.MAX_VALUE;
        }
        return j;
    }

    @Override // com.baidu.bigpipe.transport.BigpipeSessionSupport
    protected void waitingForConnect(int i) {
        LOGGER.warn("reconnect bigpipe failed.{} times", Integer.valueOf(i));
        long j = 60000;
        if (i < 5) {
            j = 500;
        }
        if (i > 5) {
            LOGGER.warn("WARNING:failed to reconnect bigpipe more than 60s.");
        }
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startSubcribe(Socket socket, Receiver receiver) throws IOException {
        BigpipePBProtocol.BigpipeCommand.Builder newBuilder = BigpipePBProtocol.BigpipeCommand.newBuilder();
        newBuilder.setType(BigpipePBProtocol.BigpipeCommand.CommandType.BMQ_SUBSCRIBE);
        BigpipePBProtocol.SubscribeCommand.Builder subscribeBuilder = newBuilder.getSubscribeBuilder();
        subscribeBuilder.setDestination(this.pipeRuntime.getTopicName());
        subscribeBuilder.setStartPoint(this.startPoint);
        LOGGER.info("subscribe start point:" + this.startPoint);
        subscribeBuilder.setReceiptId(System.currentTimeMillis() + "-" + Math.random());
        sendByteBufferData2Socket(socket, this.transStrategy.buildSimpleCommand(newBuilder.build().toByteArray()));
        ByteBuffer blockRecieve = receiver.blockRecieve(socket);
        if (blockRecieve == null) {
            throw new RuntimeException("connect session failed");
        }
        BigpipePacket parseCommand = AbstractSessionSocketStream.parseCommand(blockRecieve);
        if (parseCommand.getCommand().getType() != BigpipePBProtocol.BigpipeCommand.CommandType.BMQ_RECEIPT) {
            throw new RuntimeException(parseCommand.getCommand().getError().toString());
        }
    }

    private void sendByteBufferData2Socket(Socket socket, ByteBuffer byteBuffer) throws IOException {
        socket.getOutputStream().write(byteBuffer.array(), 0, byteBuffer.limit());
    }

    private long initStartPoint(BigPipeConf bigPipeConf) {
        Long loadPosition;
        return (this.positionStore == null || (loadPosition = this.positionStore.loadPosition()) == null) ? bigPipeConf.getDefStartPoint() : Long.valueOf(loadPosition.longValue() + 1).longValue();
    }

    private void ensureStartPoint(long j) throws NameResolveException, KeeperException {
        if (j != -2) {
            if (j == -1) {
                this.startPoint = -1L;
                return;
            } else {
                this.startPoint = j;
                return;
            }
        }
        NameService ns = this.pipeRuntime.getNs();
        String pipeletName = getPipeletName();
        TopicAddress lookupForSub = ns.lookupForSub(pipeletName, 0L);
        if (lookupForSub == null) {
            throw new NameResolveException(pipeletName, Long.MAX_VALUE, "sub stripe not found");
        }
        this.startPoint = lookupForSub.getStripe().getBeginPos();
    }

    @Override // com.baidu.bigpipe.transport.BigpipeSessionSupport
    protected SessionSocketStream openStream(TopicAddress topicAddress, SocketConf socketConf) throws IOException {
        this.socket = new Socket();
        this.socket.connect(topicAddress.getAddress(), socketConf.getConectTimeout());
        this.socket.setSoTimeout(socketConf.getIoTimeout());
        return new AbstractSessionSocketStream() { // from class: com.baidu.bigpipe.transport.sub.AsynchronousSubscriberBioImpl.2
            @Override // com.baidu.bigpipe.transport.AbstractSessionSocketStream
            protected Socket buildSocketIfNotExist() {
                return AsynchronousSubscriberBioImpl.this.socket;
            }

            @Override // com.baidu.bigpipe.transport.AbstractSessionSocketStream
            protected TransportStrategy getTransStrategy() {
                return AsynchronousSubscriberBioImpl.this.transStrategy;
            }

            @Override // com.baidu.bigpipe.transport.AbstractSessionSocketStream
            protected Receiver getReceiver() {
                return AsynchronousSubscriberBioImpl.this.reciever;
            }

            @Override // com.baidu.bigpipe.transport.AbstractSessionSocketStream
            protected int getRole() {
                return 5;
            }

            @Override // com.baidu.bigpipe.transport.AbstractSessionSocketStream
            protected void afterCreateSession() throws IOException {
                AsynchronousSubscriberBioImpl.this.startSubcribe(AsynchronousSubscriberBioImpl.this.socket, AsynchronousSubscriberBioImpl.this.reciever);
            }
        };
    }

    @Override // com.baidu.bigpipe.transport.BigpipeSessionSupport
    protected TopicAddress lookupAddr(NameService nameService, String str) throws NameResolveException, KeeperException {
        return nameService.lookupForSub(str, convertPositionFromFixedStartPoint(this.startPoint));
    }

    @Override // com.baidu.bigpipe.transport.BigpipeSessionSupport
    protected void handleFastFailed(boolean z) {
    }

    @Override // com.baidu.bigpipe.transport.BigpipeSessionSupport
    protected void safeCloseTcpConnect() {
        if (this.socket == null) {
            return;
        }
        try {
            try {
                this.socket.close();
                this.socket = null;
            } catch (IOException e) {
                LOGGER.info("safeCloseTcpConnect", e);
                this.socket = null;
            }
        } catch (Throwable th) {
            this.socket = null;
            throw th;
        }
    }
}
