package org.jgroups.protocols.pbcast;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Header;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.stack.IpAddress;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.StateTransferInfo;
import org.jgroups.util.Digest;
import org.jgroups.util.ShutdownRejectedExecutionHandler;
import org.jgroups.util.StateTransferResult;
import org.jgroups.util.Util;
import org.objectweb.asm.Opcodes;

@MBean(description = "Streaming state transfer protocol base class")
/* loaded from: input_file:extensions/fabric3-jgroups-2.5.3.jar:META-INF/lib/jgroups-3.3.0.Final.jar:org/jgroups/protocols/pbcast/StreamingStateTransfer.class */
public abstract class StreamingStateTransfer extends Protocol {
    protected ThreadPoolExecutor thread_pool;

    @Property(description = "Size (in bytes) of the state transfer buffer")
    protected int buffer_size = Opcodes.ACC_ANNOTATION;

    @Property(description = "Maximum number of pool threads serving state requests")
    protected int max_pool = 5;

    @Property(description = "Keep alive for pool threads serving state requests")
    protected long pool_thread_keep_alive = 20000;
    protected final AtomicInteger num_state_reqs = new AtomicInteger(0);
    protected final AtomicLong num_bytes_sent = new AtomicLong(0);
    protected double avg_state_size = 0.0d;
    protected Address local_addr = null;
    protected volatile Address state_provider = null;
    protected final List<Address> members = new ArrayList();
    protected volatile boolean flushProtocolInStack = false;

    @ManagedAttribute(description = "whether or not the barrier is closed")
    protected AtomicBoolean barrier_closed = new AtomicBoolean(false);
    protected final Map<Address, OutputStream> pending_state_transfers = new HashMap();
    protected final Lock state_lock = new ReentrantLock();

    /* loaded from: input_file:extensions/fabric3-jgroups-2.5.3.jar:META-INF/lib/jgroups-3.3.0.Final.jar:org/jgroups/protocols/pbcast/StreamingStateTransfer$StateGetter.class */
    protected class StateGetter implements Runnable {
        protected final Address requester;
        protected final OutputStream output;

        public StateGetter(Address address, OutputStream outputStream) {
            this.requester = address;
            this.output = outputStream;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    if (StreamingStateTransfer.this.log.isTraceEnabled()) {
                        StreamingStateTransfer.this.log.trace(StreamingStateTransfer.this.local_addr + ": getting the state from the application");
                    }
                    StreamingStateTransfer.this.up_prot.up(new Event(72, this.output));
                    this.output.flush();
                    StreamingStateTransfer.this.sendEof(this.requester);
                    StreamingStateTransfer.this.state_lock.lock();
                    try {
                        StreamingStateTransfer.this.removeRequester(this.requester);
                        StreamingStateTransfer.this.state_lock.unlock();
                    } finally {
                    }
                } catch (Throwable th) {
                    StreamingStateTransfer.this.state_lock.lock();
                    try {
                        StreamingStateTransfer.this.removeRequester(this.requester);
                        StreamingStateTransfer.this.state_lock.unlock();
                        throw th;
                    } finally {
                    }
                }
            } catch (Throwable th2) {
                if (StreamingStateTransfer.this.log.isWarnEnabled()) {
                    StreamingStateTransfer.this.log.warn(StreamingStateTransfer.this.local_addr + ": failed getting the state from the application", th2);
                }
                StreamingStateTransfer.this.sendException(this.requester, th2);
                StreamingStateTransfer.this.state_lock.lock();
                try {
                    StreamingStateTransfer.this.removeRequester(this.requester);
                    StreamingStateTransfer.this.state_lock.unlock();
                } finally {
                    StreamingStateTransfer.this.state_lock.unlock();
                }
            }
        }
    }

    /* loaded from: input_file:extensions/fabric3-jgroups-2.5.3.jar:META-INF/lib/jgroups-3.3.0.Final.jar:org/jgroups/protocols/pbcast/StreamingStateTransfer$StateHeader.class */
    public static class StateHeader extends Header {
        public static final byte STATE_REQ = 1;
        public static final byte STATE_RSP = 2;
        public static final byte STATE_PART = 3;
        public static final byte STATE_EOF = 4;
        public static final byte STATE_EX = 5;
        protected byte type;
        protected Digest my_digest;
        protected IpAddress bind_addr;

        public StateHeader() {
            this.type = (byte) 0;
            this.my_digest = null;
            this.bind_addr = null;
        }

        public StateHeader(byte b) {
            this.type = (byte) 0;
            this.my_digest = null;
            this.bind_addr = null;
            this.type = b;
        }

        public StateHeader(byte b, Digest digest) {
            this.type = (byte) 0;
            this.my_digest = null;
            this.bind_addr = null;
            this.type = b;
            this.my_digest = digest;
        }

        public StateHeader(byte b, IpAddress ipAddress, Digest digest) {
            this.type = (byte) 0;
            this.my_digest = null;
            this.bind_addr = null;
            this.type = b;
            this.my_digest = digest;
            this.bind_addr = ipAddress;
        }

        public int getType() {
            return this.type;
        }

        public Digest getDigest() {
            return this.my_digest;
        }

        @Override // org.jgroups.Header
        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("type=").append(type2Str(this.type));
            if (this.my_digest != null) {
                sb.append(", digest=").append(this.my_digest);
            }
            if (this.bind_addr != null) {
                sb.append(", bind_addr=" + this.bind_addr);
            }
            return sb.toString();
        }

        static String type2Str(int i) {
            switch (i) {
                case 1:
                    return "STATE_REQ";
                case 2:
                    return "STATE_RSP";
                case 3:
                    return "STATE_PART";
                case 4:
                    return "STATE_EOF";
                case 5:
                    return "STATE_EX";
                default:
                    return "<unknown>";
            }
        }

        @Override // org.jgroups.util.Streamable
        public void writeTo(DataOutput dataOutput) throws Exception {
            dataOutput.writeByte(this.type);
            Util.writeStreamable(this.my_digest, dataOutput);
            Util.writeStreamable(this.bind_addr, dataOutput);
        }

        @Override // org.jgroups.util.Streamable
        public void readFrom(DataInput dataInput) throws Exception {
            this.type = dataInput.readByte();
            this.my_digest = (Digest) Util.readStreamable(Digest.class, dataInput);
            this.bind_addr = (IpAddress) Util.readStreamable(IpAddress.class, dataInput);
        }

        @Override // org.jgroups.Header
        public int size() {
            int i = 1 + 1;
            if (this.my_digest != null) {
                i = (int) (i + this.my_digest.serializedSize());
            }
            return i + Util.size(this.bind_addr);
        }
    }

    @ManagedAttribute
    public int getNumberOfStateRequests() {
        return this.num_state_reqs.get();
    }

    @ManagedAttribute
    public long getNumberOfStateBytesSent() {
        return this.num_bytes_sent.get();
    }

    @ManagedAttribute
    public double getAverageStateSize() {
        return this.avg_state_size;
    }

    @ManagedAttribute
    public int getThreadPoolSize() {
        return this.thread_pool.getPoolSize();
    }

    @ManagedAttribute
    public long getThreadPoolCompletedTasks() {
        return this.thread_pool.getCompletedTaskCount();
    }

    @Override // org.jgroups.stack.Protocol
    public List<Integer> requiredDownServices() {
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(39);
        arrayList.add(42);
        return arrayList;
    }

    @Override // org.jgroups.stack.Protocol
    public void resetStats() {
        super.resetStats();
        this.num_state_reqs.set(0);
        this.num_bytes_sent.set(0L);
        this.avg_state_size = 0.0d;
    }

    @Override // org.jgroups.stack.Protocol
    public void init() throws Exception {
        super.init();
        this.thread_pool = createThreadPool();
    }

    @Override // org.jgroups.stack.Protocol
    public void destroy() {
        this.thread_pool.shutdown();
        super.destroy();
    }

    @Override // org.jgroups.stack.Protocol
    public void start() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("state_transfer", true);
        hashMap.put("protocol_class", getClass().getName());
        this.up_prot.up(new Event(56, hashMap));
        if (this.buffer_size <= 0) {
            throw new IllegalArgumentException("buffer_size has to be > 0");
        }
    }

    @Override // org.jgroups.stack.Protocol
    public void stop() {
        super.stop();
        this.barrier_closed.set(false);
    }

    @Override // org.jgroups.stack.Protocol
    public Object down(Event event) {
        Address address;
        switch (event.getType()) {
            case 6:
                handleViewChange((View) event.getArg());
                break;
            case 8:
                this.local_addr = (Address) event.getArg();
                break;
            case Event.GET_STATE /* 19 */:
                StateTransferInfo stateTransferInfo = (StateTransferInfo) event.getArg();
                if (stateTransferInfo.target == null) {
                    address = determineCoordinator();
                } else {
                    address = stateTransferInfo.target;
                    if (address.equals(this.local_addr)) {
                        if (this.log.isErrorEnabled()) {
                            this.log.error("cannot fetch state from myself !");
                        }
                        address = null;
                    }
                }
                if (address == null) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("first member (no state)");
                    }
                    this.up_prot.up(new Event(73, new StateTransferResult()));
                    return null;
                }
                this.state_provider = address;
                Message putHeader = new Message(address).putHeader(this.id, new StateHeader((byte) 1));
                if (this.log.isDebugEnabled()) {
                    this.log.debug(this.local_addr + ": asking " + address + " for state");
                }
                this.down_prot.down(new Event(1, putHeader));
                return null;
            case 56:
                handleConfig((Map) event.getArg());
                break;
        }
        return this.down_prot.down(event);
    }

    @Override // org.jgroups.stack.Protocol, org.jgroups.UpHandler
    public Object up(Event event) {
        switch (event.getType()) {
            case 1:
                Message message = (Message) event.getArg();
                StateHeader stateHeader = (StateHeader) message.getHeader(this.id);
                if (stateHeader != null) {
                    Address src = message.getSrc();
                    switch (stateHeader.type) {
                        case 1:
                            handleStateReq(src);
                            return null;
                        case 2:
                            handleStateRsp(src, stateHeader);
                            return null;
                        case 3:
                            handleStateChunk(src, message.getRawBuffer(), message.getOffset(), message.getLength());
                            return null;
                        case 4:
                            if (this.log.isTraceEnabled()) {
                                this.log.trace(this.local_addr + " <-- EOF <-- " + src);
                            }
                            handleEOF(src);
                            return null;
                        case 5:
                            handleException((Throwable) message.getObject());
                            return null;
                        default:
                            if (!this.log.isErrorEnabled()) {
                                return null;
                            }
                            this.log.error("type " + ((int) stateHeader.type) + " not known in StateHeader");
                            return null;
                    }
                }
                break;
            case 6:
            case 15:
                handleViewChange((View) event.getArg());
                break;
            case 56:
                handleConfig((Map) event.getArg());
                break;
        }
        return this.up_prot.up(event);
    }

    protected boolean isDigestNeeded() {
        return !this.flushProtocolInStack;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleConfig(Map<String, Object> map) {
        if (map != null && map.containsKey("flush_supported")) {
            this.flushProtocolInStack = true;
        }
        if (map != null && map.containsKey("state_transfer")) {
            throw new IllegalArgumentException("Protocol stack must have only one state transfer protocol");
        }
    }

    protected void handleStateChunk(Address address, byte[] bArr, int i, int i2) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleEOF(Address address) {
        this.state_provider = null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleException(Throwable th) {
        this.state_provider = null;
        openBarrierAndResumeStable();
        this.up_prot.up(new Event(73, new StateTransferResult(th)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void getStateFromApplication(Address address, OutputStream outputStream, boolean z) {
        if (outputStream == null || address == null) {
            throw new IllegalArgumentException("output stream and requester's address have to be non-null");
        }
        this.state_lock.lock();
        try {
            try {
                if (this.pending_state_transfers.containsKey(address)) {
                    throw new IllegalStateException("requester " + address + " has a pending state transfer; concurrent state transfers from the same member are not supported");
                }
                if (this.pending_state_transfers.isEmpty()) {
                    closeBarrierAndSuspendStable();
                }
                this.pending_state_transfers.put(address, outputStream);
                StateGetter stateGetter = new StateGetter(address, outputStream);
                if (z) {
                    this.thread_pool.execute(stateGetter);
                } else {
                    stateGetter.run();
                }
                this.state_lock.unlock();
            } catch (Throwable th) {
                if (0 != 0) {
                    openBarrierAndResumeStable();
                }
                sendException(address, th);
                this.pending_state_transfers.remove(address);
                this.state_lock.unlock();
            }
        } catch (Throwable th2) {
            this.state_lock.unlock();
            throw th2;
        }
    }

    protected void removeRequester(Address address) {
        if (address == null) {
            return;
        }
        OutputStream remove = this.pending_state_transfers.remove(address);
        Util.close(remove);
        if (remove == null || !this.pending_state_transfers.isEmpty()) {
            return;
        }
        openBarrierAndResumeStable();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setStateInApplication(Address address, InputStream inputStream, Digest digest) {
        closeBarrierAndSuspendStable();
        if (digest != null) {
            try {
                this.down_prot.down(new Event(42, digest));
            } catch (Throwable th) {
                handleException(th);
                return;
            }
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace(this.local_addr + ": setting the state in the aplication");
        }
        this.up_prot.up(new Event(71, inputStream));
        openBarrierAndResumeStable();
        this.up_prot.up(new Event(73, new StateTransferResult()));
    }

    @ManagedOperation(description = "Closes BARRIER and suspends STABLE")
    public void closeBarrierAndSuspendStable() {
        if (isDigestNeeded() && this.barrier_closed.compareAndSet(false, true)) {
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": sending down CLOSE_BARRIER and SUSPEND_STABLE");
            }
            this.down_prot.down(new Event(76));
            this.down_prot.down(new Event(65));
        }
    }

    @ManagedOperation(description = "Opens BARRIER and resumes STABLE")
    public void openBarrierAndResumeStable() {
        if (isDigestNeeded() && this.barrier_closed.compareAndSet(true, false)) {
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + ": sending down OPEN_BARRIER and RESUME_STABLE");
            }
            this.down_prot.down(new Event(77));
            this.down_prot.down(new Event(66));
        }
    }

    protected void sendEof(Address address) {
        try {
            Message putHeader = new Message(address).putHeader(getId(), new StateHeader((byte) 4));
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.local_addr + " --> EOF --> " + address);
            }
            down(new Event(1, putHeader));
        } catch (Throwable th) {
            this.log.error(this.local_addr + ": failed sending EOF to " + address);
        }
    }

    protected void sendException(Address address, Throwable th) {
        try {
            down(new Event(1, new Message(address, (Address) null, th).putHeader(getId(), new StateHeader((byte) 5))));
        } catch (Throwable th2) {
            this.log.error(this.local_addr + ": failed sending exception " + th.toString() + " to " + address);
        }
    }

    protected ThreadPoolExecutor createThreadPool() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, this.max_pool, this.pool_thread_keep_alive, TimeUnit.MILLISECONDS, new SynchronousQueue());
        ThreadFactory threadFactory = new ThreadFactory() { // from class: org.jgroups.protocols.pbcast.StreamingStateTransfer.1
            private final AtomicInteger thread_id = new AtomicInteger(1);

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return StreamingStateTransfer.this.getThreadFactory().newThread(runnable, "StreamingStateTransfer-sender-" + this.thread_id.getAndIncrement());
            }
        };
        threadPoolExecutor.setRejectedExecutionHandler(new ShutdownRejectedExecutionHandler(threadPoolExecutor.getRejectedExecutionHandler()));
        threadPoolExecutor.setThreadFactory(threadFactory);
        return threadPoolExecutor;
    }

    protected Address determineCoordinator() {
        synchronized (this.members) {
            for (Address address : this.members) {
                if (!this.local_addr.equals(address)) {
                    return address;
                }
            }
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleViewChange(View view) {
        List<Address> members = view.getMembers();
        synchronized (this.members) {
            this.members.clear();
            this.members.addAll(members);
        }
        this.state_lock.lock();
        try {
            for (Address address : new HashSet(this.pending_state_transfers.keySet())) {
                if (!members.contains(address)) {
                    removeRequester(address);
                }
            }
        } finally {
            this.state_lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleStateReq(Address address) {
        if (address == null) {
            if (this.log.isErrorEnabled()) {
                this.log.error(this.local_addr + ": sender of STATE_REQ is null; ignoring state transfer request");
                return;
            }
            return;
        }
        Message message = new Message(address);
        StateHeader stateHeader = new StateHeader((byte) 2, null, isDigestNeeded() ? (Digest) this.down_prot.down(Event.GET_DIGEST_EVT) : null);
        modifyStateResponseHeader(stateHeader);
        message.putHeader(this.id, stateHeader);
        if (this.log.isDebugEnabled()) {
            this.log.debug(this.local_addr + ": responding to state requester " + address);
        }
        this.down_prot.down(new Event(1, message));
        if (this.stats) {
            this.num_state_reqs.incrementAndGet();
        }
        try {
            createStreamToRequester(address);
        } catch (Throwable th) {
            sendException(address, th);
        }
    }

    protected abstract void createStreamToRequester(Address address);

    protected abstract void createStreamToProvider(Address address, StateHeader stateHeader);

    protected void modifyStateResponseHeader(StateHeader stateHeader) {
    }

    void handleStateRsp(Address address, StateHeader stateHeader) {
        createStreamToProvider(address, stateHeader);
    }
}
