/*
 * Decompiled with CFR 0.152.
 */
package com.emc.mongoose.storage.driver.coop.netty;

import com.emc.mongoose.base.Exceptions;
import com.emc.mongoose.base.config.IllegalConfigurationException;
import com.emc.mongoose.base.data.DataInput;
import com.emc.mongoose.base.item.DataItem;
import com.emc.mongoose.base.item.Item;
import com.emc.mongoose.base.item.op.OpType;
import com.emc.mongoose.base.item.op.Operation;
import com.emc.mongoose.base.item.op.composite.data.CompositeDataOperation;
import com.emc.mongoose.base.item.op.data.DataOperation;
import com.emc.mongoose.base.logging.LogContextThreadFactory;
import com.emc.mongoose.base.logging.LogUtil;
import com.emc.mongoose.base.logging.Loggers;
import com.emc.mongoose.storage.driver.coop.CoopStorageDriverBase;
import com.emc.mongoose.storage.driver.coop.netty.NettyStorageDriver;
import com.emc.mongoose.storage.driver.coop.netty.SslUtil;
import com.emc.mongoose.storage.driver.coop.netty.data.DataItemFileRegion;
import com.emc.mongoose.storage.driver.coop.netty.data.SeekableByteChannelChunkedNioStream;
import com.github.akurilov.commons.collection.Range;
import com.github.akurilov.commons.concurrent.ThreadUtil;
import com.github.akurilov.commons.system.SizeInBytes;
import com.github.akurilov.confuse.Config;
import com.github.akurilov.netty.connection.pool.MultiNodeConnPoolImpl;
import com.github.akurilov.netty.connection.pool.NonBlockingConnPool;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.channels.SeekableByteChannel;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.CloseableThreadContext;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.ThreadContext;

public abstract class NettyStorageDriverBase<I extends Item, O extends Operation<I>>
extends CoopStorageDriverBase<I, O>
implements NettyStorageDriver<I, O>,
ChannelPoolHandler {
    private static final String CLS_NAME = NettyStorageDriverBase.class.getSimpleName();
    private final EventLoopGroup ioExecutor;
    protected final String[] storageNodeAddrs;
    protected final Bootstrap bootstrap;
    protected final int storageNodePort;
    protected final int connAttemptsLimit;
    protected final int netTimeoutMilliSec;
    private final Class<SocketChannel> socketChannelCls;
    private final NonBlockingConnPool connPool;
    private final boolean sslFlag;
    protected final ChannelFutureListener reqSentCallback = this::sendFullRequestComplete;

    protected NettyStorageDriverBase(String stepId, DataInput itemDataInput, Config storageConfig, boolean verifyFlag, int batchSize) throws IllegalConfigurationException, InterruptedException {
        super(stepId, itemDataInput, storageConfig, verifyFlag, batchSize);
        int sto;
        Config netConfig = storageConfig.configVal("net");
        this.sslFlag = netConfig.boolVal("ssl");
        if (this.sslFlag) {
            Loggers.MSG.info("{}: SSL/TLS is enabled", (Object)stepId);
        }
        this.netTimeoutMilliSec = (sto = netConfig.intVal("timeoutMilliSec")) > 0 ? sto : Integer.MAX_VALUE;
        Config nodeConfig = netConfig.configVal("node");
        this.storageNodePort = nodeConfig.intVal("port");
        this.connAttemptsLimit = nodeConfig.intVal("connAttemptsLimit");
        String[] t = nodeConfig.listVal("addrs").toArray(new String[0]);
        this.storageNodeAddrs = new String[t.length];
        for (int i = 0; i < t.length; ++i) {
            String n;
            this.storageNodeAddrs[i] = n + (String)((n = t[i]).contains(":") ? "" : ":" + this.storageNodePort);
        }
        int confWorkerCount = storageConfig.intVal("driver-threads");
        int workerCount = confWorkerCount < 1 ? ThreadUtil.getHardwareThreadCount() : confWorkerCount;
        String transportConfig = netConfig.stringVal("transport");
        NettyStorageDriver.Transport transportKey = transportConfig == null || transportConfig.isEmpty() ? (Epoll.isAvailable() ? NettyStorageDriver.Transport.EPOLL : (KQueue.isAvailable() ? NettyStorageDriver.Transport.KQUEUE : NettyStorageDriver.Transport.NIO)) : NettyStorageDriver.Transport.valueOf(transportConfig.toUpperCase());
        try {
            String ioExecutorClsName = (String)IO_EXECUTOR_IMPLS.get((Object)transportKey);
            Class<?> transportCls = Class.forName(ioExecutorClsName);
            this.ioExecutor = (EventLoopGroup)transportCls.getConstructor(Integer.TYPE, ThreadFactory.class).newInstance(workerCount, new LogContextThreadFactory("ioWorker", true));
            Loggers.MSG.info("{}: use {} I/O workers", (Object)this.toString(), (Object)workerCount);
            int ioRatio = netConfig.intVal("ioRatio");
            try {
                Method setIoRatioMethod = transportCls.getMethod("setIoRatio", Integer.TYPE);
                setIoRatioMethod.invoke((Object)this.ioExecutor, ioRatio);
            }
            catch (ReflectiveOperationException e) {
                LogUtil.exception((Level)Level.ERROR, (Throwable)e, (String)"Failed to set the I/O ratio", (Object[])new Object[0]);
            }
        }
        catch (ReflectiveOperationException e) {
            throw new AssertionError((Object)e);
        }
        String socketChannelClsName = (String)SOCKET_CHANNEL_IMPLS.get((Object)transportKey);
        try {
            this.socketChannelCls = Class.forName(socketChannelClsName);
        }
        catch (ReflectiveOperationException e) {
            throw new AssertionError((Object)e);
        }
        this.bootstrap = (Bootstrap)((Bootstrap)new Bootstrap().group(this.ioExecutor)).channel(this.socketChannelCls);
        this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, netConfig.intVal("timeoutMilliSec"));
        this.bootstrap.option(ChannelOption.WRITE_SPIN_COUNT, 1);
        int size = netConfig.intVal("rcvBuf");
        if (size > 0) {
            this.bootstrap.option(ChannelOption.SO_RCVBUF, size);
        }
        if ((size = netConfig.intVal("sndBuf")) > 0) {
            this.bootstrap.option(ChannelOption.SO_SNDBUF, size);
        }
        this.bootstrap.option(ChannelOption.SO_KEEPALIVE, netConfig.boolVal("keepAlive"));
        this.bootstrap.option(ChannelOption.SO_LINGER, netConfig.intVal("linger"));
        this.bootstrap.option(ChannelOption.SO_REUSEADDR, netConfig.boolVal("reuseAddr"));
        this.bootstrap.option(ChannelOption.TCP_NODELAY, netConfig.boolVal("tcpNoDelay"));
        try (CloseableThreadContext.Instance logCtx = CloseableThreadContext.put((String)"step_id", (String)this.stepId).put("class_name", CLS_NAME);){
            this.connPool = this.createConnectionPool();
        }
    }

    protected NonBlockingConnPool createConnectionPool() {
        return new MultiNodeConnPoolImpl(this.storageNodeAddrs, this.bootstrap, this, this.storageNodePort, this.connAttemptsLimit, this.netTimeoutMilliSec, TimeUnit.MILLISECONDS);
    }

    public final void adjustIoBuffers(long avgTransferSize, OpType opType) {
        try (CloseableThreadContext.Instance logCtx = CloseableThreadContext.put((String)"step_id", (String)this.stepId).put("class_name", CLS_NAME);){
            int size = avgTransferSize < 4096L ? 4096 : (0x1000000L < avgTransferSize ? 0x1000000 : (int)avgTransferSize);
            if (OpType.CREATE.equals((Object)opType)) {
                Loggers.MSG.info("Adjust output buffer size: {}", (Object)SizeInBytes.formatFixedSize((long)size));
                this.bootstrap.option(ChannelOption.SO_RCVBUF, 4096);
                this.bootstrap.option(ChannelOption.SO_SNDBUF, size);
            } else if (OpType.READ.equals((Object)opType)) {
                Loggers.MSG.info("Adjust input buffer size: {}", (Object)SizeInBytes.formatFixedSize((long)size));
                this.bootstrap.option(ChannelOption.SO_RCVBUF, size);
                this.bootstrap.option(ChannelOption.SO_SNDBUF, 4096);
            } else {
                this.bootstrap.option(ChannelOption.SO_RCVBUF, 4096);
                this.bootstrap.option(ChannelOption.SO_SNDBUF, 4096);
            }
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    protected Channel getUnpooledConnection(String storageNodeAddr, int storageNodePort) throws ConnectException, InterruptedException {
        InetSocketAddress socketAddr;
        if (storageNodeAddr.contains(":")) {
            String[] addrParts = storageNodeAddr.split(":");
            socketAddr = new InetSocketAddress(addrParts[0], Integer.parseInt(addrParts[1]));
        } else {
            socketAddr = new InetSocketAddress(storageNodeAddr, storageNodePort);
        }
        Bootstrap bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(this.ioExecutor)).channel(this.socketChannelCls)).handler(new ChannelInitializer<SocketChannel>(){

            @Override
            protected final void initChannel(SocketChannel conn) throws Exception {
                try (CloseableThreadContext.Instance logCtx = CloseableThreadContext.put((String)"step_id", (String)NettyStorageDriverBase.this.stepId).put("class_name", CLS_NAME);){
                    NettyStorageDriverBase.this.appendHandlers(conn);
                    Loggers.MSG.debug("{}: new unpooled connection {}, pipeline: {}", (Object)NettyStorageDriverBase.this.stepId, (Object)conn.hashCode(), (Object)conn.pipeline());
                }
            }
        });
        ChannelFuture connFuture = bootstrap.connect(socketAddr);
        if (this.netTimeoutMilliSec <= 0) return connFuture.sync().channel();
        if (!connFuture.await(this.netTimeoutMilliSec, TimeUnit.MILLISECONDS)) throw new ConnectTimeoutException();
        return connFuture.channel();
    }

    protected void doStart() throws IllegalStateException {
        super.doStart();
        if (this.concurrencyLimit > 0) {
            try {
                this.connPool.preConnect(this.concurrencyLimit);
            }
            catch (ConnectException e) {
                LogUtil.exception((Level)Level.WARN, (Throwable)e, (String)"Failed to pre-create the connections", (Object[])new Object[0]);
            }
            catch (InterruptedException e) {
                com.github.akurilov.commons.lang.Exceptions.throwUnchecked((Throwable)e);
            }
        }
    }

    protected boolean submit(O op) throws IllegalStateException {
        ThreadContext.put((String)"step_id", (String)this.stepId);
        ThreadContext.put((String)"class_name", (String)CLS_NAME);
        if (!this.isStarted()) {
            throw new IllegalStateException();
        }
        if (this.concurrencyThrottle.tryAcquire()) {
            try {
                if (OpType.NOOP.equals((Object)op.type())) {
                    op.startRequest();
                    this.sendRequest(null, op);
                    op.finishRequest();
                    this.concurrencyThrottle.release();
                    op.status(Operation.Status.SUCC);
                    op.startResponse();
                    this.complete(null, op);
                } else {
                    Channel conn = this.connPool.lease();
                    if (conn == null) {
                        return false;
                    }
                    conn.attr(ATTR_KEY_OPERATION).set(op);
                    op.nodeAddr(conn.attr(NonBlockingConnPool.ATTR_KEY_NODE).get());
                    op.startRequest();
                    this.sendRequest(conn, op);
                }
            }
            catch (ConnectException e) {
                LogUtil.exception((Level)Level.WARN, (Throwable)e, (String)"Failed to lease the connection for the load operation", (Object[])new Object[0]);
                op.status(Operation.Status.FAIL_IO);
                this.complete(null, op);
            }
            catch (Throwable thrown) {
                Exceptions.throwUncheckedIfInterrupted((Throwable)thrown);
                LogUtil.exception((Level)Level.WARN, (Throwable)thrown, (String)"Failed to submit the load operation", (Object[])new Object[0]);
                op.status(Operation.Status.FAIL_UNKNOWN);
                this.complete(null, op);
            }
            return true;
        }
        return false;
    }

    protected int submit(List<O> ops, int from, int to) throws IllegalStateException {
        int n;
        block11: {
            if (ops.size() == 0) {
                return 0;
            }
            int needed = to - from;
            if (needed == 0) {
                return 0;
            }
            int permits = this.concurrencyThrottle.drainPermits();
            if (permits == 0) {
                return 0;
            }
            if (permits > needed) {
                this.concurrencyThrottle.release(permits - needed);
                permits = needed;
            }
            ThreadContext.put((String)"step_id", (String)this.stepId);
            ThreadContext.put((String)"class_name", (String)CLS_NAME);
            Operation nextOp = null;
            try {
                for (n = 0; n < permits && this.isStarted(); ++n) {
                    nextOp = (Operation)ops.get(from + n);
                    if (OpType.NOOP.equals((Object)nextOp.type())) {
                        nextOp.startRequest();
                        this.sendRequest(null, nextOp);
                        nextOp.finishRequest();
                        this.concurrencyThrottle.release();
                        nextOp.status(Operation.Status.SUCC);
                        nextOp.startResponse();
                        this.complete(null, nextOp);
                        continue;
                    }
                    Channel conn = this.connPool.lease();
                    if (conn == null) {
                        return n;
                    }
                    conn.attr(ATTR_KEY_OPERATION).set(nextOp);
                    nextOp.nodeAddr(conn.attr(NonBlockingConnPool.ATTR_KEY_NODE).get());
                    nextOp.startRequest();
                    this.sendRequest(conn, nextOp);
                }
            }
            catch (ConnectException e) {
                LogUtil.exception((Level)Level.WARN, (Throwable)e, (String)"Failed to lease the connection for the load operation", (Object[])new Object[0]);
                nextOp.status(Operation.Status.FAIL_IO);
                this.complete(null, nextOp);
                if (permits - n > 1) {
                    this.concurrencyThrottle.release(permits - n - 1);
                }
            }
            catch (Throwable thrown) {
                Exceptions.throwUncheckedIfInterrupted((Throwable)thrown);
                LogUtil.exception((Level)Level.WARN, (Throwable)thrown, (String)"Failed to submit the load operations", (Object[])new Object[0]);
                nextOp.status(Operation.Status.FAIL_UNKNOWN);
                this.complete(null, nextOp);
                if (permits - n <= 1) break block11;
                this.concurrencyThrottle.release(permits - n - 1);
            }
        }
        return n;
    }

    protected final int submit(List<O> ops) throws IllegalStateException {
        return this.submit(ops, 0, ops.size());
    }

    protected abstract void sendRequest(Channel var1, O var2);

    protected final void sendRequestData(Channel channel, O op) throws IOException {
        Item item;
        OpType opType = op.type();
        if (OpType.CREATE.equals((Object)opType)) {
            DataOperation dataOp;
            Item item2 = op.item();
            if (item2 instanceof DataItem && !((dataOp = (DataOperation)op) instanceof CompositeDataOperation)) {
                DataItem dataItem = (DataItem)item2;
                String srcPath = dataOp.srcPath();
                if (0L < dataItem.size() && (null == srcPath || srcPath.isEmpty())) {
                    if (this.sslFlag) {
                        channel.write(new SeekableByteChannelChunkedNioStream((SeekableByteChannel)dataItem));
                    } else {
                        channel.write(new DataItemFileRegion(dataItem));
                    }
                }
                dataOp.countBytesDone(dataItem.size());
            }
        } else if (OpType.UPDATE.equals((Object)opType) && (item = op.item()) instanceof DataItem) {
            DataItem dataItem = (DataItem)item;
            DataOperation dataOp = (DataOperation)op;
            List fixedRanges = dataOp.fixedRanges();
            if (fixedRanges == null || fixedRanges.isEmpty()) {
                BitSet[] updRangesMaskPair = dataOp.markedRangesMaskPair();
                int rangeCount = DataItem.rangeCount((long)dataItem.size());
                if (this.sslFlag) {
                    DataItem updatedRange;
                    int i;
                    for (i = 0; i < rangeCount; ++i) {
                        if (!updRangesMaskPair[0].get(i)) continue;
                        dataOp.currRangeIdx(i);
                        updatedRange = dataOp.currRangeUpdate();
                        channel.write(new SeekableByteChannelChunkedNioStream((SeekableByteChannel)updatedRange));
                    }
                    for (i = 0; i < rangeCount; ++i) {
                        if (!updRangesMaskPair[1].get(i)) continue;
                        dataOp.currRangeIdx(i);
                        updatedRange = dataOp.currRangeUpdate();
                        channel.write(new SeekableByteChannelChunkedNioStream((SeekableByteChannel)updatedRange));
                    }
                } else {
                    DataItem updatedRange;
                    int i;
                    for (i = 0; i < rangeCount; ++i) {
                        if (!updRangesMaskPair[0].get(i)) continue;
                        dataOp.currRangeIdx(i);
                        updatedRange = dataOp.currRangeUpdate();
                        channel.write(new DataItemFileRegion(updatedRange));
                    }
                    for (i = 0; i < rangeCount; ++i) {
                        if (!updRangesMaskPair[1].get(i)) continue;
                        dataOp.currRangeIdx(i);
                        updatedRange = dataOp.currRangeUpdate();
                        channel.write(new DataItemFileRegion(updatedRange));
                    }
                }
                dataItem.commitUpdatedRanges(dataOp.markedRangesMaskPair());
            } else {
                long baseItemSize = dataItem.size();
                if (this.sslFlag) {
                    for (Range fixedRange : fixedRanges) {
                        long beg = fixedRange.getBeg();
                        long end = fixedRange.getEnd();
                        long size = fixedRange.getSize();
                        if (size == -1L) {
                            if (beg == -1L) {
                                beg = baseItemSize - end;
                                size = end;
                            } else {
                                size = end == -1L ? baseItemSize - beg : end - beg + 1L;
                            }
                        } else {
                            beg = baseItemSize;
                            dataItem.size(dataItem.size() + dataOp.markedRangesSize());
                        }
                        channel.write(new SeekableByteChannelChunkedNioStream((SeekableByteChannel)dataItem.slice(beg, size)));
                    }
                } else {
                    for (Range fixedRange : fixedRanges) {
                        long beg = fixedRange.getBeg();
                        long end = fixedRange.getEnd();
                        long size = fixedRange.getSize();
                        if (size == -1L) {
                            if (beg == -1L) {
                                beg = baseItemSize - end;
                                size = end;
                            } else {
                                size = end == -1L ? baseItemSize - beg : end - beg + 1L;
                            }
                        } else {
                            beg = baseItemSize;
                            dataItem.size(dataItem.size() + dataOp.markedRangesSize());
                        }
                        channel.write(new DataItemFileRegion(dataItem.slice(beg, size)));
                    }
                }
            }
            dataOp.countBytesDone(dataOp.markedRangesSize());
        }
    }

    void sendFullRequestComplete(ChannelFuture future) {
        Operation op = (Operation)future.channel().attr(ATTR_KEY_OPERATION).get();
        try {
            op.finishRequest();
        }
        catch (IllegalStateException e) {
            LogUtil.exception((Level)Level.DEBUG, (Throwable)e, (String)"{}", (Object[])new Object[]{op.toString()});
        }
    }

    @Override
    public void complete(Channel channel, O op) {
        ThreadContext.put((String)"class_name", (String)CLS_NAME);
        ThreadContext.put((String)"step_id", (String)this.stepId);
        try {
            op.finishResponse();
        }
        catch (IllegalStateException e) {
            LogUtil.exception((Level)Level.DEBUG, (Throwable)e, (String)"{}: invalid load operation state", (Object[])new Object[]{op.toString()});
        }
        this.concurrencyThrottle.release();
        if (channel != null) {
            this.connPool.release(channel);
        }
        this.handleCompleted((Operation)op);
    }

    @Override
    public final void channelReleased(Channel channel) throws Exception {
    }

    @Override
    public final void channelAcquired(Channel channel) throws Exception {
    }

    @Override
    public final void channelCreated(Channel channel) throws Exception {
        try (CloseableThreadContext.Instance ctx = CloseableThreadContext.put((String)"step_id", (String)this.stepId).put("class_name", CLS_NAME);){
            this.appendHandlers(channel);
            if (Loggers.MSG.isTraceEnabled()) {
                Loggers.MSG.trace("{}: new channel pipeline configured: {}", (Object)this.stepId, (Object)channel.pipeline().toString());
            }
        }
    }

    protected void appendHandlers(Channel channel) {
        ChannelPipeline pipeline = channel.pipeline();
        if (this.sslFlag) {
            Loggers.MSG.debug("{}: SSL/TLS is enabled for the channel", (Object)this.stepId);
            pipeline.addLast(SslUtil.CLIENT_SSL_CONTEXT.newHandler(channel.alloc()));
        }
        if (this.netTimeoutMilliSec > 0) {
            pipeline.addLast(new IdleStateHandler(this.netTimeoutMilliSec, this.netTimeoutMilliSec, this.netTimeoutMilliSec, TimeUnit.MILLISECONDS));
        }
    }

    protected final void doStop() throws IllegalStateException {
        block9: {
            try (CloseableThreadContext.Instance ctx = CloseableThreadContext.put((String)"step_id", (String)this.stepId).put("class_name", CLS_NAME);){
                try {
                    Loggers.MSG.debug("{}: shutdown the I/O executor", (Object)this.toString());
                    if (this.ioExecutor.shutdownGracefully(0L, 0L, TimeUnit.NANOSECONDS).await(1L, TimeUnit.MICROSECONDS)) {
                        Loggers.MSG.debug("{}: I/O workers stopped in time", (Object)this.toString());
                        break block9;
                    }
                    Loggers.ERR.debug("{}: I/O workers stopping timeout", (Object)this.toString());
                }
                catch (InterruptedException e) {
                    LogUtil.exception((Level)Level.WARN, (Throwable)e, (String)"Graceful I/O workers shutdown was interrupted", (Object[])new Object[0]);
                    com.github.akurilov.commons.lang.Exceptions.throwUnchecked((Throwable)e);
                }
            }
        }
    }

    protected void doClose() throws IllegalStateException, IOException {
        try {
            this.connPool.close();
        }
        catch (IOException e) {
            LogUtil.exception((Level)Level.WARN, (Throwable)e, (String)"{}: failed to close the connection pool", (Object[])new Object[]{this.toString()});
        }
        super.doClose();
    }
}

