package com.baidu.bigpipe.transport.pub;

import com.baidu.bigpipe.protocol.LogIdGen;
import com.baidu.bigpipe.protocol.SessionIdProvider;
import com.baidu.bigpipe.protocol.pb.BigpipePBProtocol;
import com.baidu.bigpipe.transport.NSHead;
import com.baidu.bigpipe.transport.pub.context.WriteTask;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.zip.Adler32;

/* loaded from: input_file:com/baidu/bigpipe/transport/pub/GroupPublishStrategy.class */
public class GroupPublishStrategy extends AbstractPublishStrategy implements PublishStrategy {
    private BlockingQueue<PubTask> taskQueue = new LinkedBlockingQueue();
    private Map<String, PubTask> runingMap = new LinkedHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/baidu/bigpipe/transport/pub/GroupPublishStrategy$PubTask.class */
    public static class PubTask {
        List<Message> list;
        InternalFutrue futrue;
        String logId;
        int index;
        List<String> finList;
        long pubTime;
        long writeTime;

        private PubTask() {
            this.list = new LinkedList();
            this.index = 0;
            this.finList = new LinkedList();
        }
    }

    public GroupPublishStrategy() {
        super.setMaxConcurrent(2000);
    }

    @Override // com.baidu.bigpipe.transport.pub.PublishStrategy
    public void submitMessage(Message message) {
        PubTask pubTask = new PubTask();
        pubTask.pubTime = System.currentTimeMillis();
        pubTask.list.add(message);
        pubTask.futrue = message.future;
        pubTask.logId = message.getLogId();
        this.taskQueue.offer(pubTask);
    }

    @Override // com.baidu.bigpipe.transport.pub.PublishStrategy
    public void submitMessage(List<Message> list, InternalFutrue internalFutrue) {
        PubTask pubTask = new PubTask();
        pubTask.pubTime = System.currentTimeMillis();
        pubTask.list = list;
        pubTask.futrue = internalFutrue;
        this.taskQueue.offer(pubTask);
    }

    @Override // com.baidu.bigpipe.transport.pub.PublishStrategy
    public WriteTask getNextTask(LogIdGen logIdGen, long j, String str, String str2) {
        PubTask poll;
        Message message = null;
        String str3 = null;
        Iterator<String> it = this.runingMap.keySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            PubTask pubTask = this.runingMap.get(it.next());
            if (pubTask.index < pubTask.list.size()) {
                str3 = pubTask.logId;
                message = pubTask.list.get(pubTask.index);
                pubTask.index++;
                pubTask.writeTime = System.currentTimeMillis();
                break;
            }
        }
        if (message == null) {
            if (!super.canRunTask() || (poll = this.taskQueue.poll()) == null) {
                return null;
            }
            super.registerRunTask();
            if (poll.logId == null) {
                poll.logId = logIdGen.genId() + "";
            }
            str3 = poll.logId;
            this.runingMap.put(poll.logId, poll);
            message = poll.list.get(poll.index);
            poll.writeTime = System.currentTimeMillis();
            poll.index++;
        }
        WriteTask writeTask = new WriteTask();
        writeTask.setLogId(str3 + "." + message.getLogId());
        writeTask.setBuf(packMessage(message, writeTask.getLogId(), j, str, str2));
        writeTask.setSessionMessageId(j);
        return writeTask;
    }

    @Override // com.baidu.bigpipe.transport.pub.PublishStrategy
    public void handleShutDown(SessionIdProvider sessionIdProvider) {
        fastFailedRunning(sessionIdProvider);
        while (this.taskQueue.peek() != null) {
            PubTask poll = this.taskQueue.poll();
            poll.futrue.trigger(poll.list, "shut down", sessionIdProvider.getSessionId(false));
        }
    }

    @Override // com.baidu.bigpipe.transport.pub.PublishStrategy
    public void fastFailed(SessionIdProvider sessionIdProvider) {
        fastFailedRunning(sessionIdProvider);
    }

    @Override // com.baidu.bigpipe.transport.pub.AbstractPublishStrategy
    protected void fastFailedRunning(SessionIdProvider sessionIdProvider) {
        int i = 0;
        Iterator<String> it = this.runingMap.keySet().iterator();
        while (it.hasNext()) {
            PubTask pubTask = this.runingMap.get(it.next());
            List<Message> collectFailedTaskFromTask = collectFailedTaskFromTask(pubTask);
            i += pubTask.list.size();
            pubTask.futrue.trigger(collectFailedTaskFromTask, "io error.", sessionIdProvider.getSessionId(false));
            super.unRegisterRunTask();
        }
        super.releaseToken(i);
        this.runingMap.clear();
    }

    @Override // com.baidu.bigpipe.transport.pub.AbstractPublishStrategy
    protected void handleFinish(String str, long j, SessionIdProvider sessionIdProvider) {
        String[] split = str.split("\\.");
        String str2 = split[0];
        PubTask pubTask = this.runingMap.get(str2);
        if (pubTask == null) {
            return;
        }
        pubTask.finList.add(split[1]);
        if (pubTask.finList.size() == pubTask.list.size()) {
            this.runingMap.remove(str2);
            super.unRegisterRunTask();
            pubTask.futrue.trigger(Collections.emptyList(), null, sessionIdProvider.getSessionId(false));
        }
        super.releaseToken();
    }

    private List<Message> collectFailedTaskFromTask(PubTask pubTask) {
        LinkedList linkedList = new LinkedList();
        for (Message message : pubTask.list) {
            if (pubTask.finList.indexOf(message.getLogId()) < 0) {
                linkedList.add(message);
            }
        }
        return linkedList;
    }

    private ByteBuffer packMessage(Message message, String str, long j, String str2, String str3) {
        BigpipePBProtocol.BigpipeCommand.Builder newBuilder = BigpipePBProtocol.BigpipeCommand.newBuilder();
        newBuilder.setType(BigpipePBProtocol.BigpipeCommand.CommandType.BMQ_SEND);
        BigpipePBProtocol.MessageCommand.Builder messageBuilder = newBuilder.getMessageBuilder();
        messageBuilder.setDestination(str3);
        messageBuilder.setNoDup(false);
        messageBuilder.setSessionMessageId(j);
        messageBuilder.setReceiptId(str);
        messageBuilder.setSessionId(str2);
        if (message.packedMessage) {
            messageBuilder.setMessageLength(message.getBody().length);
        } else {
            messageBuilder.setMessageLength(8 + message.getBody().length);
        }
        byte[] byteArray = newBuilder.build().toByteArray();
        ByteBuffer allocate = ByteBuffer.allocate(40 + byteArray.length + ((int) messageBuilder.getMessageLength()));
        allocate.order(ByteOrder.LITTLE_ENDIAN);
        allocate.position(36);
        allocate.putInt(byteArray.length);
        allocate.put(byteArray);
        if (message.packedMessage) {
            allocate.put(message.getBody());
        } else {
            allocate.putInt((int) messageBuilder.getMessageLength());
            allocate.putInt(message.getBody().length);
            allocate.put(message.getBody());
        }
        NSHead factory = NSHead.factory(getProvider());
        factory.setBodyLen(byteArray.length + 4 + ((int) messageBuilder.getMessageLength()));
        Adler32 adler32 = new Adler32();
        byte[] array = allocate.array();
        adler32.update(array, 36, (int) factory.getBodyLen());
        factory.setReserved((int) (adler32.getValue() & (-1)));
        System.arraycopy(factory.toBytes(), 0, array, 0, 36);
        allocate.flip();
        return allocate;
    }

    @Override // com.baidu.bigpipe.transport.pub.PublishStrategy
    public int getCurrentTaskCount() {
        return this.runingMap.size();
    }

    @Override // com.baidu.bigpipe.transport.pub.PublishStrategy
    public boolean handlePubTimeout(SessionIdProvider sessionIdProvider) {
        long currentTimeMillis = System.currentTimeMillis() - this.socketConf.getIoTimeout();
        boolean z = (0 != 0 || handleWaitingTimeout(currentTimeMillis, sessionIdProvider)) || handleRunningingTimeout(currentTimeMillis, sessionIdProvider);
        return false;
    }

    private boolean handleWaitingTimeout(long j, SessionIdProvider sessionIdProvider) {
        boolean z = false;
        while (true) {
            PubTask peek = this.taskQueue.peek();
            if (null == peek || peek.pubTime > j) {
                break;
            }
            z = true;
            this.taskQueue.poll();
            int size = peek.list.size();
            peek.futrue.trigger(peek.list, "timeout, waiting to send", sessionIdProvider.getSessionId(false));
            super.releaseToken(size);
        }
        return z;
    }

    private boolean handleRunningingTimeout(long j, SessionIdProvider sessionIdProvider) {
        boolean z = false;
        LinkedList<String> linkedList = new LinkedList();
        for (String str : this.runingMap.keySet()) {
            PubTask pubTask = this.runingMap.get(str);
            if (pubTask.writeTime <= j) {
                z = true;
                List<Message> collectFailedTaskFromTask = collectFailedTaskFromTask(pubTask);
                int size = pubTask.list.size();
                pubTask.futrue.trigger(collectFailedTaskFromTask, "timeout,sending,logid is " + str, sessionIdProvider.getSessionId(false));
                super.releaseToken(size);
                linkedList.add(str);
            }
        }
        for (String str2 : linkedList) {
            super.unRegisterRunTask();
            this.runingMap.remove(str2);
        }
        return z;
    }
}
