package io.daos;

import io.netty.buffershade4.ByteBuf;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:io/daos/DaosEventQueue.class */
public class DaosEventQueue {
    private final long eqWrapperHdl;
    private final String threadName;
    private final long threadId;
    private final int nbrOfEvents;
    protected final Event[] events;
    private final ByteBuf completed;
    private int nextEventIdx;
    private int nbrOfAcquired;
    protected boolean released;
    private long lastProgressed;
    private int nbrOfTimedOut;
    private Map<Class<?>, List<Attachment>> attMap = new HashMap();
    private static final int DEFAULT_POLL_TIMEOUT_MS = 10;
    private static final int DEFAULT_NO_PROGRESS_DURATION_ERROR;
    private static final int DEFAULT_NBR_OF_TIMEDOUT_WARN = 5;
    private static final int DEFAULT_NBR_OF_TIMEDOUT_ERROR = 10;
    private static int DEFAULT_NBR_OF_EVENTS;
    private static final Map<Long, DaosEventQueue> EQ_MAP;
    private static final Logger log;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:io/daos/DaosEventQueue$Attachment.class */
    public interface Attachment {
        void setEvent(Event event);

        void reuse();

        void ready();

        boolean alwaysBoundToEvt();

        void discard();

        boolean isDiscarded();

        void release();
    }

    /* loaded from: input_file:io/daos/DaosEventQueue$Event.class */
    public class Event {
        private final short id;
        private final long eqHandle;
        protected Attachment attachment;
        protected EventStatus status = EventStatus.FREE;

        protected Event(short s) {
            this.eqHandle = DaosEventQueue.this.eqWrapperHdl;
            this.id = s;
        }

        public short getId() {
            return this.id;
        }

        public Attachment getAttachment() {
            return this.attachment;
        }

        public long getEqHandle() {
            return this.eqHandle;
        }

        public Attachment setAttachment(Attachment attachment) {
            Attachment attachment2 = this.attachment;
            this.attachment = attachment;
            return attachment2;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void putBack() {
            this.status = EventStatus.FREE;
            if (this.attachment == null || this.attachment.alwaysBoundToEvt()) {
                return;
            }
            this.attachment = null;
        }

        public Attachment complete() {
            Attachment attachment = this.attachment;
            if (this.attachment != null) {
                this.attachment.ready();
            }
            putBack();
            return attachment;
        }

        public void abort() throws DaosIOException {
            if (this.status != EventStatus.USING) {
                return;
            }
            this.status = EventStatus.ABORTED;
            if (DaosEventQueue.this.abortEvent(this)) {
                return;
            }
            this.status = EventStatus.FREE;
        }
    }

    /* loaded from: input_file:io/daos/DaosEventQueue$EventStatus.class */
    public enum EventStatus {
        FREE,
        USING,
        ABORTED
    }

    protected DaosEventQueue(String str, int i) throws IOException {
        this.threadName = str == null ? Thread.currentThread().getName() : str;
        this.threadId = Thread.currentThread().getId();
        if (i > 32767) {
            throw new IllegalArgumentException("number of events should be no larger than 32767");
        }
        this.nbrOfEvents = i;
        this.eqWrapperHdl = DaosClient.createEventQueue(i);
        this.events = new Event[i];
        for (int i2 = 0; i2 < i; i2++) {
            this.events[i2] = createEvent((short) i2);
        }
        this.completed = BufferAllocator.objBufWithNativeOrder((i * 2) + 2);
        this.completed.writerIndex(this.completed.capacity());
    }

    protected Event createEvent(short s) {
        return new Event(s);
    }

    public static DaosEventQueue getInstance(int i) throws IOException {
        long id = Thread.currentThread().getId();
        DaosEventQueue daosEventQueue = EQ_MAP.get(Long.valueOf(id));
        if (daosEventQueue == null) {
            if (i <= 0) {
                i = DEFAULT_NBR_OF_EVENTS;
            }
            daosEventQueue = new DaosEventQueue(null, i);
            EQ_MAP.put(Long.valueOf(id), daosEventQueue);
        }
        return daosEventQueue;
    }

    public int getNbrOfEvents() {
        return this.nbrOfEvents;
    }

    public Event acquireEvent() {
        int i = this.nextEventIdx;
        if (this.nbrOfAcquired == this.nbrOfEvents) {
            return null;
        }
        while (this.events[i].status != EventStatus.FREE) {
            i++;
            if (i == this.nbrOfEvents) {
                i = 0;
            }
            if (i == this.nextEventIdx) {
                return null;
            }
        }
        this.nextEventIdx = i + 1;
        if (this.nextEventIdx == this.nbrOfEvents) {
            this.nextEventIdx = 0;
        }
        Event event = this.events[i];
        event.status = EventStatus.USING;
        this.nbrOfAcquired++;
        return event;
    }

    public Event acquireEventBlocking(long j, List<Attachment> list, Class<? extends Attachment> cls, Set<? extends Attachment> set) throws IOException {
        Event acquireEvent = acquireEvent();
        if (acquireEvent != null) {
            return acquireEvent;
        }
        checkProgress();
        int i = 0;
        long currentTimeMillis = System.currentTimeMillis();
        while (acquireEvent == null) {
            if ((i % 10 == 0) && (System.currentTimeMillis() - currentTimeMillis > j)) {
                this.nbrOfTimedOut++;
                throw new TimedOutException("failed to acquire event after waiting more than " + j + " ms");
            }
            pollCompleted(list, cls, set, i < 100 ? i : 100);
            acquireEvent = acquireEvent();
            i++;
        }
        return acquireEvent;
    }

    private void checkProgress() throws TimedOutException {
        if (this.nbrOfTimedOut > 10) {
            long currentTimeMillis = System.currentTimeMillis() - this.lastProgressed;
            if (currentTimeMillis > DEFAULT_NO_PROGRESS_DURATION_ERROR) {
                throw new TimedOutException("too long duration without progress. number of timedout: " + this.nbrOfTimedOut + ", duration: " + currentTimeMillis + ", nbrOfAcquired: " + this.nbrOfAcquired + ", nbrOfEvents: " + this.nbrOfEvents);
            }
        }
        if (this.nbrOfTimedOut > 5) {
            log.warn("number of timedout: " + this.nbrOfTimedOut);
        }
    }

    public boolean hasPendingEvent() {
        return this.nbrOfAcquired > 0;
    }

    public void waitForCompletion(long j, Class<? extends Attachment> cls, List<Attachment> list) throws IOException {
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        while (this.nbrOfAcquired > 0) {
            int i2 = this.nbrOfAcquired;
            pollCompleted(list, cls, null, i);
            i = i2 == this.nbrOfAcquired ? 10 : 0;
            if (System.currentTimeMillis() - currentTimeMillis > j) {
                this.nbrOfTimedOut++;
                throw new TimedOutException("no completion after waiting more than " + j + ", nbrOfAcquired: " + this.nbrOfAcquired + ", total: " + this.nbrOfEvents);
            }
        }
    }

    public Event getEvent(int i) {
        if (i >= this.nbrOfEvents) {
            throw new IllegalArgumentException("event index " + i + " should not exceed number of total events, " + this.nbrOfEvents);
        }
        return this.events[i];
    }

    public boolean abortEvent(Event event) {
        return DaosClient.abortEvent(this.eqWrapperHdl, event.getId());
    }

    public void returnEvent(Event event) {
        event.putBack();
        this.nbrOfAcquired--;
    }

    public int pollCompleted(List<Attachment> list, Class<? extends Attachment> cls, Set<? extends Attachment> set, long j) throws IOException {
        return pollCompleted(list, cls, set, set != null ? set.size() : this.nbrOfEvents, j);
    }

    public int pollCompleted(List<Attachment> list, Class<? extends Attachment> cls, Set<? extends Attachment> set, int i, long j) throws IOException {
        if (!$assertionsDisabled && Thread.currentThread().getId() != this.threadId) {
            throw new AssertionError("current thread " + Thread.currentThread().getId() + "(" + Thread.currentThread().getName() + "), is not expected " + Thread.currentThread().getId() + "(" + this.threadName + ")");
        }
        int moveAttachment = moveAttachment(list, cls, set, i);
        int i2 = i - moveAttachment;
        if (i2 == 0) {
            return moveAttachment;
        }
        while (this.nbrOfAcquired > 0) {
            int i3 = 0;
            DaosClient.pollCompleted(this.eqWrapperHdl, this.completed.memoryAddress(), this.nbrOfAcquired, j < 0 ? 10L : j);
            this.completed.readerIndex(0);
            short readShort = this.completed.readShort();
            for (int i4 = 0; i4 < readShort; i4++) {
                Event event = this.events[this.completed.readShort()];
                if (event.status == EventStatus.ABORTED) {
                    i3++;
                    event.putBack();
                } else {
                    Attachment complete = event.complete();
                    if ((list.size() < i2) && (set == null || set.contains(complete))) {
                        list.add(complete);
                    } else {
                        detainAttachment(complete);
                    }
                }
            }
            this.nbrOfAcquired -= readShort;
            int i5 = readShort - i3;
            if (i5 > 0) {
                this.lastProgressed = System.currentTimeMillis();
                this.nbrOfTimedOut = 0;
                return i5;
            }
            if (i3 == 0) {
                return i5;
            }
        }
        return 0;
    }

    private int moveAttachment(List<Attachment> list, Class<? extends Attachment> cls, Set<? extends Attachment> set, int i) {
        int i2 = 0;
        List<Attachment> list2 = this.attMap.get(cls);
        if (list2 != null && !list2.isEmpty()) {
            Iterator<Attachment> it = list2.iterator();
            while (true) {
                if (!(i2 < i) || !it.hasNext()) {
                    break;
                }
                Attachment next = it.next();
                if (set == null || set.contains(next)) {
                    list.add(next);
                    it.remove();
                    i2++;
                }
            }
        }
        return i2;
    }

    private void detainAttachment(Attachment attachment) {
        if (attachment.isDiscarded()) {
            attachment.release();
            return;
        }
        Class<?> cls = attachment.getClass();
        List<Attachment> list = this.attMap.get(cls);
        if (list == null) {
            list = new LinkedList();
            this.attMap.put(cls, list);
        }
        list.add(attachment);
    }

    public int getNbrOfAcquired() {
        return this.nbrOfAcquired;
    }

    public long getEqWrapperHdl() {
        return this.eqWrapperHdl;
    }

    public synchronized void release() throws IOException {
        if (this.released) {
            return;
        }
        DaosClient.destroyEventQueue(this.eqWrapperHdl);
        releaseMore();
        this.released = true;
    }

    protected void releaseMore() {
        for (Event event : this.events) {
            Attachment attachment = event.getAttachment();
            if (attachment != null) {
                attachment.release();
            }
        }
        Iterator<List<Attachment>> it = this.attMap.values().iterator();
        while (it.hasNext()) {
            it.next().forEach(attachment2 -> {
                attachment2.release();
            });
        }
        this.attMap = null;
    }

    public static void destroy(long j, DaosEventQueue daosEventQueue) throws IOException {
        if (j != Thread.currentThread().getId()) {
            throw new UnsupportedOperationException("Cannot destroy EQ belongs to other thread, id: " + j);
        }
        if (EQ_MAP.get(Long.valueOf(j)) != daosEventQueue) {
            throw new IllegalArgumentException("given EQ is not same as EQ of current thread");
        }
        daosEventQueue.release();
        EQ_MAP.remove(Long.valueOf(j));
    }

    public static void destroyAll() {
        EQ_MAP.forEach((l, daosEventQueue) -> {
            try {
                daosEventQueue.release();
            } catch (Throwable th) {
                log.error("failed to destroy event queue in thread, " + daosEventQueue.threadName, th);
            }
        });
        EQ_MAP.clear();
    }

    public String getThreadName() {
        return this.threadName;
    }

    public long getThreadId() {
        return this.threadId;
    }

    static {
        $assertionsDisabled = !DaosEventQueue.class.desiredAssertionStatus();
        DEFAULT_NO_PROGRESS_DURATION_ERROR = Integer.valueOf(System.getProperty(Constants.CFG_DAOS_TIMEOUT, Constants.DEFAULT_DAOS_TIMEOUT_MS)).intValue();
        EQ_MAP = new ConcurrentHashMap();
        log = LoggerFactory.getLogger(DaosEventQueue.class);
        String property = System.getProperty(Constants.CFG_NUMBER_OF_EVENTS_PER_EQ);
        if (property != null) {
            DEFAULT_NBR_OF_EVENTS = Integer.valueOf(property).intValue();
        } else {
            String str = System.getenv(Constants.CFG_NUMBER_OF_EVENTS_PER_EQ);
            DEFAULT_NBR_OF_EVENTS = str == null ? 128 : Integer.valueOf(str).intValue();
        }
        if (DEFAULT_NBR_OF_EVENTS <= 0) {
            log.error("got non-positive number of events per EQ, " + DEFAULT_NBR_OF_EVENTS + ", check your property or env config, " + Constants.CFG_NUMBER_OF_EVENTS_PER_EQ + ". set to default value, 128");
            DEFAULT_NBR_OF_EVENTS = 128;
        }
    }
}
