/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.wire.channel.impl;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import net.openhft.affinity.AffinityStrategy;
import net.openhft.affinity.AffinityThreadFactory;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.io.ClosedIllegalStateException;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.UnrecoverableTimeoutException;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.channel.EventPoller;
import net.openhft.chronicle.wire.channel.impl.DelegateChronicleChannel;
import net.openhft.chronicle.wire.channel.impl.TCPChronicleChannel;
import net.openhft.chronicle.wire.channel.impl.WireExchanger;

public class BufferedChronicleChannel
extends DelegateChronicleChannel {
    private static final boolean ALLOW_AFFINITY = Jvm.getBoolean("useAffinity", true);
    private final Pauser pauser;
    private final WireExchanger exchanger = new WireExchanger();
    private final ExecutorService bgWriter;
    private volatile EventPoller eventPoller;

    public BufferedChronicleChannel(TCPChronicleChannel channel, Pauser pauser) {
        super(channel);
        this.pauser = pauser;
        String desc = channel.connectionCfg().initiator() ? "init" : "accp";
        String writer = desc + "-writer";
        ThreadFactory factory = ALLOW_AFFINITY && pauser.isBusy() ? new AffinityThreadFactory(writer, true, new AffinityStrategy[0]) : new NamedThreadFactory(writer, true);
        this.bgWriter = Executors.newSingleThreadExecutor(factory);
        this.bgWriter.submit(this::bgWrite);
    }

    @Override
    public EventPoller eventPoller() {
        return this.eventPoller;
    }

    @Override
    public BufferedChronicleChannel eventPoller(EventPoller eventPoller) {
        if (this.isClosed()) {
            throw new ClosedIllegalStateException(this.getClass().getName() + " closed for " + Thread.currentThread().getName());
        }
        this.eventPoller = eventPoller;
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void bgWrite() {
        try {
            TCPChronicleChannel channel = (TCPChronicleChannel)this.channel;
            while (!this.isClosing()) {
                channel.checkConnected();
                Wire wire = this.exchanger.acquireConsumer();
                if (wire.bytes().isEmpty()) {
                    EventPoller eventPoller = this.eventPoller();
                    boolean idle = eventPoller == null || !eventPoller.onPoll(this);
                    this.exchanger.releaseConsumer();
                    if (!idle) continue;
                    this.pauser.pause();
                    continue;
                }
                assert (TCPChronicleChannel.validateHeader(wire.bytes().peekVolatileInt()));
                this.pauser.reset();
                channel.flushOut(wire);
                this.exchanger.releaseConsumer();
            }
        }
        catch (Throwable t) {
            Thread.yield();
            if (!this.isClosing() && !this.channel.isClosing()) {
                Jvm.warn().on(this.getClass(), "bgWriter died", t);
            }
        }
        finally {
            this.bgWriter.shutdown();
            Closeable.closeQuietly((Object)this.eventPoller());
        }
    }

    @Override
    public DocumentContext writingDocument(boolean metaData) throws UnrecoverableTimeoutException {
        return this.exchanger.writingDocument(metaData);
    }

    @Override
    public DocumentContext acquireWritingDocument(boolean metaData) throws UnrecoverableTimeoutException {
        return this.exchanger.acquireWritingDocument(metaData);
    }

    @Override
    public WireOut acquireProducer() {
        return this.exchanger.acquireProducer();
    }

    @Override
    public void releaseProducer() {
        this.exchanger.releaseProducer();
    }

    @Override
    public void close() {
        super.close();
        Closeable.closeQuietly(this.eventPoller, this.exchanger);
    }
}

