package org.apache.shardingsphere.scaling.core.execute.executor.channel;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.PlaceholderRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.scaling.core.job.position.Position;

/* loaded from: input_file:org/apache/shardingsphere/scaling/core/execute/executor/channel/DistributionChannel.class */
public final class DistributionChannel implements Channel {
    private final int channelNumber;
    private final AckCallback ackCallback;
    private ScheduledExecutorService scheduleAckRecordsExecutor;
    private final Map<String, MemoryChannel> channels = new HashMap();
    private final Map<String, String> channelAssignment = new HashMap();
    private final Queue<Record> toBeAcknowledgeRecords = new ConcurrentLinkedQueue();
    private final Map<Position, Record> pendingAcknowledgeRecords = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/shardingsphere/scaling/core/execute/executor/channel/DistributionChannel$SingleChannelAckCallback.class */
    private final class SingleChannelAckCallback implements AckCallback {
        private SingleChannelAckCallback() {
        }

        @Override // org.apache.shardingsphere.scaling.core.execute.executor.channel.AckCallback
        public void onAck(List<Record> list) {
            for (Record record : list) {
                DistributionChannel.this.pendingAcknowledgeRecords.put(record.getPosition(), record);
            }
        }
    }

    public DistributionChannel(int i, AckCallback ackCallback) {
        this.channelNumber = i;
        this.ackCallback = ackCallback;
        for (int i2 = 0; i2 < i; i2++) {
            this.channels.put(Integer.toString(i2), new MemoryChannel(new SingleChannelAckCallback()));
        }
        scheduleAckRecords();
    }

    private void scheduleAckRecords() {
        this.scheduleAckRecordsExecutor = Executors.newSingleThreadScheduledExecutor();
        this.scheduleAckRecordsExecutor.scheduleAtFixedRate(this::ackRecords0, 5L, 1L, TimeUnit.SECONDS);
    }

    private synchronized void ackRecords0() {
        LinkedList linkedList = new LinkedList();
        while (!this.toBeAcknowledgeRecords.isEmpty()) {
            Record peek = this.toBeAcknowledgeRecords.peek();
            if (!this.pendingAcknowledgeRecords.containsKey(peek.getPosition())) {
                break;
            }
            linkedList.add(peek);
            this.toBeAcknowledgeRecords.poll();
            this.pendingAcknowledgeRecords.remove(peek.getPosition());
        }
        if (linkedList.isEmpty()) {
            return;
        }
        this.ackCallback.onAck(linkedList);
    }

    @Override // org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel
    public void pushRecord(Record record) throws InterruptedException {
        if (FinishedRecord.class.equals(record.getClass())) {
            Iterator<Map.Entry<String, MemoryChannel>> it = this.channels.entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().pushRecord(record);
            }
        } else {
            if (DataRecord.class.equals(record.getClass())) {
                this.toBeAcknowledgeRecords.add(record);
                DataRecord dataRecord = (DataRecord) record;
                this.channels.get(Integer.toString(Math.abs(dataRecord.hashCode()) % this.channelNumber)).pushRecord(dataRecord);
                return;
            }
            if (!PlaceholderRecord.class.equals(record.getClass())) {
                throw new RuntimeException("Not Support Record Type");
            }
            this.toBeAcknowledgeRecords.add(record);
            this.pendingAcknowledgeRecords.put(record.getPosition(), record);
        }
    }

    @Override // org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel
    public List<Record> fetchRecords(int i, int i2) {
        return findChannel().fetchRecords(i, i2);
    }

    @Override // org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel
    public void ack() {
        findChannel().ack();
    }

    @Override // org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel
    public void close() {
        Iterator<MemoryChannel> it = this.channels.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.scheduleAckRecordsExecutor.shutdown();
        ackRecords0();
    }

    private Channel findChannel() {
        String l = Long.toString(Thread.currentThread().getId());
        checkAssignment(l);
        return this.channels.get(this.channelAssignment.get(l));
    }

    private void checkAssignment(String str) {
        if (this.channelAssignment.containsKey(str)) {
            return;
        }
        synchronized (this) {
            if (!this.channelAssignment.containsKey(str)) {
                assignmentChannel(str);
            }
        }
    }

    private void assignmentChannel(String str) {
        for (Map.Entry<String, MemoryChannel> entry : this.channels.entrySet()) {
            if (!this.channelAssignment.containsValue(entry.getKey())) {
                this.channelAssignment.put(str, entry.getKey());
            }
        }
    }
}
