/*
 * Decompiled with CFR 0.152.
 */
package io.questdb.cairo.pool;

import io.questdb.MessageBus;
import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.CairoException;
import io.questdb.cairo.DefaultLifecycleManager;
import io.questdb.cairo.LifecycleManager;
import io.questdb.cairo.TableUtils;
import io.questdb.cairo.TableWriter;
import io.questdb.cairo.pool.AbstractPool;
import io.questdb.cairo.pool.PoolConstants;
import io.questdb.cairo.pool.ResourcePool;
import io.questdb.cairo.pool.ex.EntryLockedException;
import io.questdb.cairo.pool.ex.EntryUnavailableException;
import io.questdb.cairo.pool.ex.PoolClosedException;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.std.ConcurrentHashMap;
import io.questdb.std.Misc;
import io.questdb.std.Unsafe;
import io.questdb.std.microtime.MicrosecondClock;
import io.questdb.std.str.Path;
import java.util.Iterator;
import org.jetbrains.annotations.Nullable;

public class WriterPool
extends AbstractPool
implements ResourcePool<TableWriter> {
    private static final Log LOG = LogFactory.getLog(WriterPool.class);
    private static final long ENTRY_OWNER = Unsafe.getFieldOffset(Entry.class, "owner");
    private final ConcurrentHashMap<Entry> entries = new ConcurrentHashMap();
    private final CairoConfiguration configuration;
    private final Path path = new Path();
    private final MicrosecondClock clock;
    private final CharSequence root;
    @Nullable
    private final MessageBus messageBus;

    public WriterPool(CairoConfiguration configuration, @Nullable MessageBus messageBus) {
        super(configuration, configuration.getInactiveWriterTTL());
        this.configuration = configuration;
        this.messageBus = messageBus;
        this.clock = configuration.getMicrosecondClock();
        this.root = configuration.getRoot();
        this.notifyListener(Thread.currentThread().getId(), null, (short)23);
    }

    @Override
    public TableWriter get(CharSequence tableName) {
        this.checkClosed();
        long thread = Thread.currentThread().getId();
        Entry e = this.entries.get(tableName);
        if (e == null) {
            e = new Entry(this.clock.getTicks());
            Entry other = this.entries.putIfAbsent(tableName, e);
            if (other == null) {
                return this.createWriter(tableName, e, thread);
            }
            e = other;
        }
        long owner = e.owner;
        if (Unsafe.cas((Object)e, ENTRY_OWNER, -1L, thread)) {
            if (e.writer == null) {
                return this.createWriter(tableName, e, thread);
            }
            return this.checkClosedAndGetWriter(tableName, e);
        }
        if (e.owner == thread) {
            if (e.lockFd != -1L) {
                throw EntryLockedException.INSTANCE;
            }
            if (e.ex != null) {
                this.notifyListener(thread, tableName, (short)21);
                throw e.ex;
            }
            return this.checkClosedAndGetWriter(tableName, e);
        }
        LOG.error().$("busy [table=`").utf8(tableName).$("`, owner=").$(owner).$(']').$();
        throw EntryUnavailableException.INSTANCE;
    }

    public boolean lock(CharSequence tableName) {
        this.checkClosed();
        long thread = Thread.currentThread().getId();
        Entry e = this.entries.get(tableName);
        if (e == null) {
            e = new Entry(this.clock.getTicks());
            Entry other = this.entries.putIfAbsent(tableName, e);
            if (other == null) {
                if (this.lockAndNotify(thread, e, tableName)) {
                    return true;
                }
                this.entries.remove(tableName);
                return false;
            }
            e = other;
        }
        if (Unsafe.cas((Object)e, ENTRY_OWNER, -1L, thread)) {
            this.closeWriter(thread, e, (short)19, 2);
            return this.lockAndNotify(thread, e, tableName);
        }
        LOG.error().$("could not lock, busy [table=`").utf8(tableName).$("`, owner=").$(e.owner).$(", thread=").$(thread).$(']').$();
        this.notifyListener(thread, tableName, (short)7);
        return false;
    }

    public int getBusyCount() {
        int count = 0;
        for (Entry e : this.entries.values()) {
            if (e.owner == -1L) continue;
            ++count;
        }
        return count;
    }

    private TableWriter checkClosedAndGetWriter(CharSequence tableName, Entry e) {
        if (this.isClosed()) {
            LOG.info().$('\'').utf8(tableName).$("' born free").$();
            return e.goodby();
        }
        return this.logAndReturn(e, (short)11);
    }

    public int size() {
        return this.entries.size();
    }

    public void unlock(CharSequence name) {
        this.unlock(name, null);
    }

    public void unlock(CharSequence name, @Nullable TableWriter writer) {
        long thread = Thread.currentThread().getId();
        Entry e = this.entries.get(name);
        if (e == null) {
            this.notifyListener(thread, name, (short)9);
            return;
        }
        if (e.owner == thread) {
            if (e.writer != null) {
                this.notifyListener(thread, name, (short)9);
                throw CairoException.instance(0).put("Writer ").put(name).put(" is not locked");
            }
            if (writer == null) {
                this.entries.remove(name);
                if (e.lockFd != -1L) {
                    this.ff.close(e.lockFd);
                    TableUtils.lockName(this.path.of(this.root).concat(name));
                    if (!this.ff.remove(this.path)) {
                        LOG.error().$("could not remove [file=").$(this.path).$(']').$();
                    }
                }
            } else {
                e.writer = writer;
                writer.setLifecycleManager(e);
                writer.transferLock(e.lockFd);
                e.lockFd = -1L;
                Unsafe.getUnsafe().putOrderedLong(e, ENTRY_OWNER, -1L);
            }
        } else {
            this.notifyListener(thread, name, (short)12);
            throw CairoException.instance(0).put("Not lock owner of ").put(name);
        }
        this.notifyListener(thread, name, (short)8);
    }

    private void checkClosed() {
        if (this.isClosed()) {
            LOG.info().$("is closed").$();
            throw PoolClosedException.INSTANCE;
        }
    }

    @Override
    protected void closePool() {
        super.closePool();
        Misc.free(this.path);
        LOG.info().$("closed").$();
    }

    @Override
    protected boolean releaseAll(long deadline) {
        long thread = Thread.currentThread().getId();
        boolean removed = false;
        int reason = deadline == Long.MAX_VALUE ? 1 : 3;
        Iterator<Entry> iterator = this.entries.values().iterator();
        while (iterator.hasNext()) {
            Entry e = iterator.next();
            if (deadline > e.lastReleaseTime && e.owner == -1L) {
                if (!Unsafe.cas((Object)e, ENTRY_OWNER, -1L, thread)) continue;
                this.closeWriter(thread, e, (short)17, reason);
                iterator.remove();
                removed = true;
                continue;
            }
            if (e.lockFd != -1L) {
                if (!this.ff.close(e.lockFd)) continue;
                e.lockFd = -1L;
                iterator.remove();
                removed = true;
                continue;
            }
            if (e.ex == null) continue;
            LOG.info().$("purging entry for failed to allocate writer").$();
            iterator.remove();
            removed = true;
        }
        return removed;
    }

    private void closeWriter(long thread, Entry e, short ev, int reason) {
        TableWriter w = e.writer;
        if (w != null) {
            CharSequence name = e.writer.getName();
            w.setLifecycleManager(DefaultLifecycleManager.INSTANCE);
            w.close();
            e.writer = null;
            LOG.info().$("closed [table=`").utf8(name).$("`, reason=").$(PoolConstants.closeReasonText(reason)).$(", by=").$(thread).$(']').$();
            this.notifyListener(thread, name, ev);
        }
    }

    int countFreeWriters() {
        int count = 0;
        for (Entry e : this.entries.values()) {
            if (e.owner == -1L) {
                ++count;
                continue;
            }
            LOG.info().$("'").utf8(e.writer.getName()).$("' is still busy [owner=").$(e.owner).$(']').$();
        }
        return count;
    }

    private TableWriter createWriter(CharSequence name, Entry e, long thread) {
        try {
            this.checkClosed();
            LOG.info().$("open [table=`").utf8(name).$("`, thread=").$(thread).$(']').$();
            e.writer = new TableWriter(this.configuration, name, this.messageBus, true, e);
            return this.logAndReturn(e, (short)10);
        }
        catch (CairoException ex) {
            LOG.error().$("could not open [table=`").utf8(name).$("`, thread=").$(e.owner).$(']').$();
            e.ex = ex;
            this.notifyListener(e.owner, name, (short)14);
            throw ex;
        }
    }

    private boolean lockAndNotify(long thread, Entry e, CharSequence tableName) {
        TableUtils.lockName(this.path.of(this.root).concat(tableName));
        e.lockFd = TableUtils.lock(this.ff, this.path);
        if (e.lockFd == -1L) {
            LOG.error().$("could not lock [table=`").utf8(tableName).$("`, thread=").$(thread).$(']').$();
            e.owner = -1L;
            return false;
        }
        LOG.info().$("locked [table=`").utf8(tableName).$("`, thread=").$(thread).$(']').$();
        this.notifyListener(thread, tableName, (short)6);
        return true;
    }

    private TableWriter logAndReturn(Entry e, short event) {
        LOG.info().$(">> [table=`").utf8(e.writer.getName()).$("`, thread=").$(e.owner).$(']').$();
        this.notifyListener(e.owner, e.writer.getName(), event);
        return e.writer;
    }

    private boolean returnToPool(Entry e) {
        CharSequence name = e.writer.getName();
        long thread = Thread.currentThread().getId();
        if (e.owner != -1L) {
            LOG.info().$("<< [table=`").utf8(name).$("`, thread=").$(thread).$(']').$();
            if (this.isClosed()) {
                LOG.info().$("allowing '").utf8(name).$("' to close [thread=").$(e.owner).$(']').$();
                this.entries.remove(name);
                this.notifyListener(thread, name, (short)2);
                return false;
            }
            e.owner = -1L;
            e.lastReleaseTime = this.configuration.getMicrosecondClock().getTicks();
            this.notifyListener(thread, name, (short)1);
        } else {
            LOG.error().$("orphaned [table=`").utf8(name).$("`]").$();
            this.notifyListener(thread, name, (short)3);
        }
        return true;
    }

    private class Entry
    implements LifecycleManager {
        private volatile long owner = Thread.currentThread().getId();
        private TableWriter writer;
        private volatile long lastReleaseTime;
        private CairoException ex = null;
        private volatile long lockFd = -1L;

        public Entry(long lastReleaseTime) {
            this.lastReleaseTime = lastReleaseTime;
        }

        @Override
        public boolean close() {
            return !WriterPool.this.returnToPool(this);
        }

        public TableWriter goodby() {
            TableWriter w = this.writer;
            if (this.writer != null) {
                this.writer.setLifecycleManager(DefaultLifecycleManager.INSTANCE);
                this.writer = null;
            }
            return w;
        }
    }
}

