package com.emc.mongoose.storage.driver.coop.netty;

import com.emc.mongoose.base.config.IllegalConfigurationException;
import com.emc.mongoose.base.data.DataInput;
import com.emc.mongoose.base.item.DataItem;
import com.emc.mongoose.base.item.Item;
import com.emc.mongoose.base.item.op.OpType;
import com.emc.mongoose.base.item.op.Operation;
import com.emc.mongoose.base.item.op.composite.data.CompositeDataOperation;
import com.emc.mongoose.base.item.op.data.DataOperation;
import com.emc.mongoose.base.logging.LogContextThreadFactory;
import com.emc.mongoose.base.logging.LogUtil;
import com.emc.mongoose.base.logging.Loggers;
import com.emc.mongoose.storage.driver.coop.CoopStorageDriverBase;
import com.emc.mongoose.storage.driver.coop.netty.NettyStorageDriver;
import com.emc.mongoose.storage.driver.coop.netty.data.DataItemFileRegion;
import com.emc.mongoose.storage.driver.coop.netty.data.SeekableByteChannelChunkedNioStream;
import com.github.akurilov.commons.collection.Range;
import com.github.akurilov.commons.concurrent.ThreadUtil;
import com.github.akurilov.commons.lang.Exceptions;
import com.github.akurilov.commons.system.SizeInBytes;
import com.github.akurilov.confuse.Config;
import com.github.akurilov.netty.connection.pool.MultiNodeConnPoolImpl;
import com.github.akurilov.netty.connection.pool.NonBlockingConnPool;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.internal.ChannelUtils;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.internal.StringUtil;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.CloseableThreadContext;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.ThreadContext;

/* loaded from: input_file:com/emc/mongoose/storage/driver/coop/netty/NettyStorageDriverBase.class */
public abstract class NettyStorageDriverBase<I extends Item, O extends Operation<I>> extends CoopStorageDriverBase<I, O> implements NettyStorageDriver<I, O>, ChannelPoolHandler {
    private static final String CLS_NAME = NettyStorageDriverBase.class.getSimpleName();
    private final EventLoopGroup ioExecutor;
    protected final String[] storageNodeAddrs;
    protected final Bootstrap bootstrap;
    protected final int storageNodePort;
    protected final int connAttemptsLimit;
    protected final int netTimeoutMilliSec;
    private final Class<SocketChannel> socketChannelCls;
    private final NonBlockingConnPool connPool;
    private final boolean sslFlag;
    protected final ChannelFutureListener reqSentCallback;

    /* JADX INFO: Access modifiers changed from: protected */
    public NettyStorageDriverBase(String str, DataInput dataInput, Config config, boolean z, int i) throws IllegalConfigurationException, InterruptedException {
        super(str, dataInput, config, z, i);
        this.reqSentCallback = this::sendFullRequestComplete;
        Config configVal = config.configVal("net");
        this.sslFlag = configVal.boolVal("ssl");
        if (this.sslFlag) {
            Loggers.MSG.info("{}: SSL/TLS is enabled", str);
        }
        int intVal = configVal.intVal("timeoutMilliSec");
        if (intVal > 0) {
            this.netTimeoutMilliSec = intVal;
        } else {
            this.netTimeoutMilliSec = ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
        }
        Config configVal2 = configVal.configVal("node");
        this.storageNodePort = configVal2.intVal("port");
        this.connAttemptsLimit = configVal2.intVal("connAttemptsLimit");
        String[] strArr = (String[]) configVal2.listVal("addrs").toArray(new String[0]);
        this.storageNodeAddrs = new String[strArr.length];
        for (int i2 = 0; i2 < strArr.length; i2++) {
            String str2 = strArr[i2];
            this.storageNodeAddrs[i2] = str2 + (str2.contains(":") ? StringUtil.EMPTY_STRING : ":" + this.storageNodePort);
        }
        int intVal2 = config.intVal("driver-threads");
        int hardwareThreadCount = intVal2 < 1 ? ThreadUtil.getHardwareThreadCount() : intVal2;
        String stringVal = configVal.stringVal("transport");
        NettyStorageDriver.Transport valueOf = (stringVal == null || stringVal.isEmpty()) ? Epoll.isAvailable() ? NettyStorageDriver.Transport.EPOLL : KQueue.isAvailable() ? NettyStorageDriver.Transport.KQUEUE : NettyStorageDriver.Transport.NIO : NettyStorageDriver.Transport.valueOf(stringVal.toUpperCase());
        try {
            Class<?> cls = Class.forName(IO_EXECUTOR_IMPLS.get(valueOf));
            this.ioExecutor = (EventLoopGroup) cls.getConstructor(Integer.TYPE, ThreadFactory.class).newInstance(Integer.valueOf(hardwareThreadCount), new LogContextThreadFactory("ioWorker", true));
            Loggers.MSG.info("{}: use {} I/O workers", toString(), Integer.valueOf(hardwareThreadCount));
            try {
                cls.getMethod("setIoRatio", Integer.TYPE).invoke(this.ioExecutor, Integer.valueOf(configVal.intVal("ioRatio")));
            } catch (ReflectiveOperationException e) {
                LogUtil.exception(Level.ERROR, e, "Failed to set the I/O ratio", new Object[0]);
            }
            try {
                this.socketChannelCls = Class.forName(SOCKET_CHANNEL_IMPLS.get(valueOf));
                this.bootstrap = new Bootstrap().group(this.ioExecutor).channel(this.socketChannelCls);
                this.bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(configVal.intVal("timeoutMilliSec")));
                this.bootstrap.option(ChannelOption.WRITE_SPIN_COUNT, 1);
                int intVal3 = configVal.intVal("rcvBuf");
                if (intVal3 > 0) {
                    this.bootstrap.option(ChannelOption.SO_RCVBUF, Integer.valueOf(intVal3));
                }
                int intVal4 = configVal.intVal("sndBuf");
                if (intVal4 > 0) {
                    this.bootstrap.option(ChannelOption.SO_SNDBUF, Integer.valueOf(intVal4));
                }
                this.bootstrap.option(ChannelOption.SO_KEEPALIVE, Boolean.valueOf(configVal.boolVal("keepAlive")));
                this.bootstrap.option(ChannelOption.SO_LINGER, Integer.valueOf(configVal.intVal("linger")));
                this.bootstrap.option(ChannelOption.SO_REUSEADDR, Boolean.valueOf(configVal.boolVal("reuseAddr")));
                this.bootstrap.option(ChannelOption.TCP_NODELAY, Boolean.valueOf(configVal.boolVal("tcpNoDelay")));
                CloseableThreadContext.Instance put = CloseableThreadContext.put("step_id", this.stepId).put("class_name", CLS_NAME);
                try {
                    this.connPool = createConnectionPool();
                    if (put != null) {
                        put.close();
                    }
                } catch (Throwable th) {
                    if (put != null) {
                        try {
                            put.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (ReflectiveOperationException e2) {
                throw new AssertionError(e2);
            }
        } catch (ReflectiveOperationException e3) {
            throw new AssertionError(e3);
        }
    }

    protected NonBlockingConnPool createConnectionPool() {
        return new MultiNodeConnPoolImpl(this.storageNodeAddrs, this.bootstrap, this, this.storageNodePort, this.connAttemptsLimit, this.netTimeoutMilliSec, TimeUnit.MILLISECONDS);
    }

    public final void adjustIoBuffers(long j, OpType opType) {
        int i;
        CloseableThreadContext.Instance put = CloseableThreadContext.put("step_id", this.stepId).put("class_name", CLS_NAME);
        if (j < 4096) {
            i = 4096;
        } else if (16777216 < j) {
            i = 16777216;
        } else {
            try {
                i = (int) j;
            } catch (Throwable th) {
                if (put != null) {
                    try {
                        put.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (OpType.CREATE.equals(opType)) {
            Loggers.MSG.info("Adjust output buffer size: {}", SizeInBytes.formatFixedSize(i));
            this.bootstrap.option(ChannelOption.SO_RCVBUF, 4096);
            this.bootstrap.option(ChannelOption.SO_SNDBUF, Integer.valueOf(i));
        } else if (OpType.READ.equals(opType)) {
            Loggers.MSG.info("Adjust input buffer size: {}", SizeInBytes.formatFixedSize(i));
            this.bootstrap.option(ChannelOption.SO_RCVBUF, Integer.valueOf(i));
            this.bootstrap.option(ChannelOption.SO_SNDBUF, 4096);
        } else {
            this.bootstrap.option(ChannelOption.SO_RCVBUF, 4096);
            this.bootstrap.option(ChannelOption.SO_SNDBUF, 4096);
        }
        if (put != null) {
            put.close();
        }
    }

    /* JADX WARN: Type inference failed for: r0v15, types: [io.netty.channel.ChannelFuture] */
    protected Channel getUnpooledConnection(String str, int i) throws ConnectException, InterruptedException {
        InetSocketAddress inetSocketAddress;
        Channel channel;
        if (str.contains(":")) {
            String[] split = str.split(":");
            inetSocketAddress = new InetSocketAddress(split[0], Integer.parseInt(split[1]));
        } else {
            inetSocketAddress = new InetSocketAddress(str, i);
        }
        ChannelFuture connect = new Bootstrap().group(this.ioExecutor).channel(this.socketChannelCls).handler(new ChannelInitializer<SocketChannel>() { // from class: com.emc.mongoose.storage.driver.coop.netty.NettyStorageDriverBase.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // io.netty.channel.ChannelInitializer
            public final void initChannel(SocketChannel socketChannel) throws Exception {
                CloseableThreadContext.Instance put = CloseableThreadContext.put("step_id", NettyStorageDriverBase.this.stepId).put("class_name", NettyStorageDriverBase.CLS_NAME);
                try {
                    NettyStorageDriverBase.this.appendHandlers(socketChannel);
                    Loggers.MSG.debug("{}: new unpooled connection {}, pipeline: {}", NettyStorageDriverBase.this.stepId, Integer.valueOf(socketChannel.hashCode()), socketChannel.pipeline());
                    if (put != null) {
                        put.close();
                    }
                } catch (Throwable th) {
                    if (put != null) {
                        try {
                            put.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
        }).connect(inetSocketAddress);
        if (this.netTimeoutMilliSec <= 0) {
            channel = connect.sync2().channel();
        } else {
            if (!connect.await(this.netTimeoutMilliSec, TimeUnit.MILLISECONDS)) {
                throw new ConnectTimeoutException();
            }
            channel = connect.channel();
        }
        return channel;
    }

    protected void doStart() throws IllegalStateException {
        super.doStart();
        if (this.concurrencyLimit > 0) {
            try {
                this.connPool.preConnect(this.concurrencyLimit);
            } catch (InterruptedException e) {
                Exceptions.throwUnchecked(e);
            } catch (ConnectException e2) {
                LogUtil.exception(Level.WARN, e2, "Failed to pre-create the connections", new Object[0]);
            }
        }
    }

    protected boolean submit(O o) throws IllegalStateException {
        ThreadContext.put("step_id", this.stepId);
        ThreadContext.put("class_name", CLS_NAME);
        if (!isStarted()) {
            throw new IllegalStateException();
        }
        if (!this.concurrencyThrottle.tryAcquire()) {
            return false;
        }
        try {
            if (OpType.NOOP.equals(o.type())) {
                o.startRequest();
                sendRequest(null, o);
                o.finishRequest();
                this.concurrencyThrottle.release();
                o.status(Operation.Status.SUCC);
                o.startResponse();
                complete(null, o);
            } else {
                Channel lease = this.connPool.lease();
                if (lease == null) {
                    return false;
                }
                lease.attr(ATTR_KEY_OPERATION).set(o);
                o.nodeAddr((String) lease.attr(NonBlockingConnPool.ATTR_KEY_NODE).get());
                o.startRequest();
                sendRequest(lease, o);
            }
            return true;
        } catch (ConnectException e) {
            LogUtil.exception(Level.WARN, e, "Failed to lease the connection for the load operation", new Object[0]);
            o.status(Operation.Status.FAIL_IO);
            complete(null, o);
            return true;
        } catch (Throwable th) {
            com.emc.mongoose.base.Exceptions.throwUncheckedIfInterrupted(th);
            LogUtil.exception(Level.WARN, th, "Failed to submit the load operation", new Object[0]);
            o.status(Operation.Status.FAIL_UNKNOWN);
            complete(null, o);
            return true;
        }
    }

    protected int submit(List<O> list, int i, int i2) throws IllegalStateException {
        int i3;
        if (list.size() == 0 || (i3 = i2 - i) == 0) {
            return 0;
        }
        int drainPermits = this.concurrencyThrottle.drainPermits();
        if (drainPermits == 0) {
            return 0;
        }
        if (drainPermits > i3) {
            this.concurrencyThrottle.release(drainPermits - i3);
            drainPermits = i3;
        }
        ThreadContext.put("step_id", this.stepId);
        ThreadContext.put("class_name", CLS_NAME);
        O o = null;
        int i4 = 0;
        while (i4 < drainPermits) {
            try {
                if (!isStarted()) {
                    break;
                }
                o = list.get(i + i4);
                if (OpType.NOOP.equals(o.type())) {
                    o.startRequest();
                    sendRequest(null, o);
                    o.finishRequest();
                    this.concurrencyThrottle.release();
                    o.status(Operation.Status.SUCC);
                    o.startResponse();
                    complete(null, o);
                } else {
                    Channel lease = this.connPool.lease();
                    if (lease == null) {
                        return i4;
                    }
                    lease.attr(ATTR_KEY_OPERATION).set(o);
                    o.nodeAddr((String) lease.attr(NonBlockingConnPool.ATTR_KEY_NODE).get());
                    o.startRequest();
                    sendRequest(lease, o);
                }
                i4++;
            } catch (ConnectException e) {
                LogUtil.exception(Level.WARN, e, "Failed to lease the connection for the load operation", new Object[0]);
                o.status(Operation.Status.FAIL_IO);
                complete(null, o);
                if (drainPermits - i4 > 1) {
                    this.concurrencyThrottle.release((drainPermits - i4) - 1);
                }
            } catch (Throwable th) {
                com.emc.mongoose.base.Exceptions.throwUncheckedIfInterrupted(th);
                LogUtil.exception(Level.WARN, th, "Failed to submit the load operations", new Object[0]);
                o.status(Operation.Status.FAIL_UNKNOWN);
                complete(null, o);
                if (drainPermits - i4 > 1) {
                    this.concurrencyThrottle.release((drainPermits - i4) - 1);
                }
            }
        }
        return i4;
    }

    protected final int submit(List<O> list) throws IllegalStateException {
        return submit(list, 0, list.size());
    }

    protected abstract void sendRequest(Channel channel, O o);

    protected final void sendRequestData(Channel channel, O o) throws IOException {
        OpType type = o.type();
        if (OpType.CREATE.equals(type)) {
            DataItem item = o.item();
            if (item instanceof DataItem) {
                DataOperation dataOperation = (DataOperation) o;
                if (dataOperation instanceof CompositeDataOperation) {
                    return;
                }
                DataItem dataItem = item;
                String srcPath = dataOperation.srcPath();
                if (0 < dataItem.size() && (null == srcPath || srcPath.isEmpty())) {
                    if (this.sslFlag) {
                        channel.write(new SeekableByteChannelChunkedNioStream(dataItem));
                    } else {
                        channel.write(new DataItemFileRegion(dataItem));
                    }
                }
                dataOperation.countBytesDone(dataItem.size());
                return;
            }
            return;
        }
        if (OpType.UPDATE.equals(type)) {
            DataItem item2 = o.item();
            if (item2 instanceof DataItem) {
                DataItem dataItem2 = item2;
                DataOperation dataOperation2 = (DataOperation) o;
                List<Range> fixedRanges = dataOperation2.fixedRanges();
                if (fixedRanges == null || fixedRanges.isEmpty()) {
                    BitSet[] markedRangesMaskPair = dataOperation2.markedRangesMaskPair();
                    int rangeCount = DataItem.rangeCount(dataItem2.size());
                    if (this.sslFlag) {
                        for (int i = 0; i < rangeCount; i++) {
                            if (markedRangesMaskPair[0].get(i)) {
                                dataOperation2.currRangeIdx(i);
                                channel.write(new SeekableByteChannelChunkedNioStream(dataOperation2.currRangeUpdate()));
                            }
                        }
                        for (int i2 = 0; i2 < rangeCount; i2++) {
                            if (markedRangesMaskPair[1].get(i2)) {
                                dataOperation2.currRangeIdx(i2);
                                channel.write(new SeekableByteChannelChunkedNioStream(dataOperation2.currRangeUpdate()));
                            }
                        }
                    } else {
                        for (int i3 = 0; i3 < rangeCount; i3++) {
                            if (markedRangesMaskPair[0].get(i3)) {
                                dataOperation2.currRangeIdx(i3);
                                channel.write(new DataItemFileRegion(dataOperation2.currRangeUpdate()));
                            }
                        }
                        for (int i4 = 0; i4 < rangeCount; i4++) {
                            if (markedRangesMaskPair[1].get(i4)) {
                                dataOperation2.currRangeIdx(i4);
                                channel.write(new DataItemFileRegion(dataOperation2.currRangeUpdate()));
                            }
                        }
                    }
                    dataItem2.commitUpdatedRanges(dataOperation2.markedRangesMaskPair());
                } else {
                    long size = dataItem2.size();
                    if (this.sslFlag) {
                        for (Range range : fixedRanges) {
                            long beg = range.getBeg();
                            long end = range.getEnd();
                            long size2 = range.getSize();
                            if (size2 != -1) {
                                beg = size;
                                dataItem2.size(dataItem2.size() + dataOperation2.markedRangesSize());
                            } else if (beg == -1) {
                                beg = size - end;
                                size2 = end;
                            } else {
                                size2 = end == -1 ? size - beg : (end - beg) + 1;
                            }
                            channel.write(new SeekableByteChannelChunkedNioStream(dataItem2.slice(beg, size2)));
                        }
                    } else {
                        for (Range range2 : fixedRanges) {
                            long beg2 = range2.getBeg();
                            long end2 = range2.getEnd();
                            long size3 = range2.getSize();
                            if (size3 != -1) {
                                beg2 = size;
                                dataItem2.size(dataItem2.size() + dataOperation2.markedRangesSize());
                            } else if (beg2 == -1) {
                                beg2 = size - end2;
                                size3 = end2;
                            } else {
                                size3 = end2 == -1 ? size - beg2 : (end2 - beg2) + 1;
                            }
                            channel.write(new DataItemFileRegion(dataItem2.slice(beg2, size3)));
                        }
                    }
                }
                dataOperation2.countBytesDone(dataOperation2.markedRangesSize());
            }
        }
    }

    void sendFullRequestComplete(ChannelFuture channelFuture) {
        Operation operation = (Operation) channelFuture.channel().attr(ATTR_KEY_OPERATION).get();
        try {
            operation.finishRequest();
        } catch (IllegalStateException e) {
            LogUtil.exception(Level.DEBUG, e, "{}", new Object[]{operation.toString()});
        }
    }

    @Override // com.emc.mongoose.storage.driver.coop.netty.NettyStorageDriver
    public void complete(Channel channel, O o) {
        ThreadContext.put("class_name", CLS_NAME);
        ThreadContext.put("step_id", this.stepId);
        try {
            o.finishResponse();
        } catch (IllegalStateException e) {
            LogUtil.exception(Level.DEBUG, e, "{}: invalid load operation state", new Object[]{o.toString()});
        }
        this.concurrencyThrottle.release();
        if (channel != null) {
            this.connPool.release(channel);
        }
        handleCompleted(o);
    }

    @Override // io.netty.channel.pool.ChannelPoolHandler
    public final void channelReleased(Channel channel) throws Exception {
    }

    @Override // io.netty.channel.pool.ChannelPoolHandler
    public final void channelAcquired(Channel channel) throws Exception {
    }

    @Override // io.netty.channel.pool.ChannelPoolHandler
    public final void channelCreated(Channel channel) throws Exception {
        CloseableThreadContext.Instance put = CloseableThreadContext.put("step_id", this.stepId).put("class_name", CLS_NAME);
        try {
            appendHandlers(channel);
            if (Loggers.MSG.isTraceEnabled()) {
                Loggers.MSG.trace("{}: new channel pipeline configured: {}", this.stepId, channel.pipeline().toString());
            }
            if (put != null) {
                put.close();
            }
        } catch (Throwable th) {
            if (put != null) {
                try {
                    put.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected void appendHandlers(Channel channel) {
        ChannelPipeline pipeline = channel.pipeline();
        if (this.sslFlag) {
            Loggers.MSG.debug("{}: SSL/TLS is enabled for the channel", this.stepId);
            pipeline.addLast(SslUtil.CLIENT_SSL_CONTEXT.newHandler(channel.alloc()));
        }
        if (this.netTimeoutMilliSec > 0) {
            pipeline.addLast(new IdleStateHandler(this.netTimeoutMilliSec, this.netTimeoutMilliSec, this.netTimeoutMilliSec, TimeUnit.MILLISECONDS));
        }
    }

    protected final void doStop() throws IllegalStateException {
        CloseableThreadContext.Instance put = CloseableThreadContext.put("step_id", this.stepId).put("class_name", CLS_NAME);
        try {
            try {
                Loggers.MSG.debug("{}: shutdown the I/O executor", toString());
                if (this.ioExecutor.shutdownGracefully(0L, 0L, TimeUnit.NANOSECONDS).await(1L, TimeUnit.MICROSECONDS)) {
                    Loggers.MSG.debug("{}: I/O workers stopped in time", toString());
                } else {
                    Loggers.ERR.debug("{}: I/O workers stopping timeout", toString());
                }
            } catch (InterruptedException e) {
                LogUtil.exception(Level.WARN, e, "Graceful I/O workers shutdown was interrupted", new Object[0]);
                Exceptions.throwUnchecked(e);
            }
            if (put != null) {
                put.close();
            }
        } catch (Throwable th) {
            if (put != null) {
                try {
                    put.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected void doClose() throws IllegalStateException, IOException {
        try {
            this.connPool.close();
        } catch (IOException e) {
            LogUtil.exception(Level.WARN, e, "{}: failed to close the connection pool", new Object[]{toString()});
        }
        super.doClose();
    }
}
