/*
 * Decompiled with CFR 0.152.
 */
package org.hpccsystems.dfs.client;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.SocketFactory;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.hpccsystems.commons.benchmarking.IMetric;
import org.hpccsystems.commons.benchmarking.IProfilable;
import org.hpccsystems.commons.benchmarking.SimpleMetric;
import org.hpccsystems.commons.benchmarking.Units;
import org.hpccsystems.commons.ecl.FieldDef;
import org.hpccsystems.commons.ecl.FileFilter;
import org.hpccsystems.commons.ecl.RecordDefinitionTranslator;
import org.hpccsystems.commons.errors.HpccFileException;
import org.hpccsystems.commons.network.Network;
import org.hpccsystems.dfs.client.DataPartition;

public class RowServiceInputStream
extends InputStream
implements IProfilable {
    private AtomicBoolean active = new AtomicBoolean(false);
    private AtomicBoolean closed = new AtomicBoolean(false);
    private boolean simulateFail = false;
    private boolean forceTokenUse = false;
    private boolean inFetchingMode = false;
    private byte[] tokenBin;
    private int handle;
    private DataPartition dataPart;
    private FieldDef recordDefinition = null;
    private String jsonRecordDefinition = null;
    private FieldDef projectedRecordDefinition = null;
    private String projectedJsonRecordDefinition = null;
    private DataInputStream dis;
    private DataOutputStream dos;
    private int filePartCopyIndexPointer = 0;
    private List<Integer> prioritizedCopyIndexes = new ArrayList<Integer>();
    private Thread prefetchThread = null;
    private HpccFileException prefetchException = null;
    private final Semaphore bufferWriteMutex = new Semaphore(1);
    private byte[] readBuffer;
    private AtomicInteger readBufferCapacity = new AtomicInteger(0);
    private AtomicInteger readBufferDataLen = new AtomicInteger(0);
    private int readPos = 0;
    private int markPos = -1;
    private int readLimit = -1;
    private int recordLimit = -1;
    private int totalDataInCurrentRequest = 0;
    private int remainingDataInCurrentRequest = 0;
    private long streamPos = 0L;
    private long streamPosOfFetchStart = 0L;
    private List<Long> streamPosOfFetches = new ArrayList<Long>();
    private List<byte[]> tokenBinOfFetches = new ArrayList<byte[]>();
    private List<Long> fetchRequestOffsets = new ArrayList<Long>();
    private long firstByteTimeNS = -1L;
    private long mutexWaitTimeNS = 0L;
    private long waitTimeNS = 0L;
    private long sleepTimeNS = 0L;
    private long fetchStartTimeNS = 0L;
    private long fetchTimeNS = 0L;
    private long fetchFinishTimeNS = 0L;
    private long closeTimeNS = 0L;
    private int numLongWaits = 0;
    private int numFetches = 0;
    private long numPartialBlockReads = 0L;
    private long numBlockReads = 0L;
    private Socket sock;
    public static final int DEFAULT_CONNECT_TIMEOUT_MILIS = 5000;
    private int connectTimeout = 5000;
    private static final Charset HPCCCharSet = Charset.forName("ISO-8859-1");
    private static final int DEFAULT_MAX_READ_SIZE_KB = 4096;
    private static final int PREFETCH_SLEEP_MS = 1;
    private static final int LONG_WAIT_THRESHOLD_US = 100;
    private static final int MIN_SOCKET_READ_SIZE = 512;
    private static final Logger log = LogManager.getLogger(RowServiceInputStream.class);
    private int maxReadSizeKB = 4096;
    private int bufferPrefetchThresholdKB = 2048;
    private int bufferCompactThresholdKB = 1024;
    public static final String BYTES_READ_METRIC = "bytesRead";
    public static final String FIRST_BYTE_TIME_METRIC = "prefetchFirstByteTime";
    public static final String WAIT_TIME_METRIC = "parseWaitTime";
    public static final String MUTEX_WAIT_TIME_METRIC = "mutexWaitTime";
    public static final String SLEEP_TIME_METRIC = "prefetchSleepTime";
    public static final String FETCH_START_TIME_METRIC = "fetchRequestStartTime";
    public static final String FETCH_TIME_METRIC = "fetchRequestReadTime";
    public static final String FETCH_FINISH_TIME_METRIC = "fetchRequestFinishTime";
    public static final String CLOSE_TIME_METRIC = "connectionCloseTime";
    public static final String LONG_WAITS_METRIC = "numLongWaits";
    public static final String FETCHES_METRIC = "numFetches";
    public static final String PARTIAL_BLOCK_READS_METRIC = "numPartialBlockReads";
    public static final String BLOCK_READS_METRIC = "numBlockReads";

    public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd) throws Exception {
        this(dp, rd, pRd, 5000);
    }

    public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout) throws Exception {
        this(dp, rd, pRd, connectTimeout, -1);
    }

    public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit) throws Exception {
        this(dp, rd, pRd, connectTimeout, limit, true, -1);
    }

    public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB) throws Exception {
        this(dp, rd, pRd, connectTimeout, limit, createPrefetchThread, maxReadSizeInKB, null, false);
    }

    public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restart) throws Exception {
        this(dp, rd, pRd, connectTimeout, limit, createPrefetchThread, maxReadSizeInKB, restart, false);
    }

    public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int connectTimeout, int limit, boolean createPrefetchThread, int maxReadSizeInKB, RestartInformation restartInfo, boolean isFetching) throws Exception {
        this.recordDefinition = rd;
        this.projectedRecordDefinition = pRd;
        this.inFetchingMode = isFetching;
        if (maxReadSizeInKB > 0) {
            this.maxReadSizeKB = maxReadSizeInKB;
        }
        this.bufferPrefetchThresholdKB = this.maxReadSizeKB / 2;
        this.bufferCompactThresholdKB = this.maxReadSizeKB / 4;
        this.jsonRecordDefinition = RecordDefinitionTranslator.toJsonRecord(this.recordDefinition).toString();
        this.projectedJsonRecordDefinition = RecordDefinitionTranslator.toJsonRecord(this.projectedRecordDefinition).toString();
        this.dataPart = dp;
        int copycount = this.dataPart.getCopyCount();
        for (int index = 0; index < copycount; ++index) {
            String currentCopyLocation = this.dataPart.getCopyIP(index);
            if (Network.isLocalAddress(currentCopyLocation)) {
                this.prioritizedCopyIndexes.add(0, index);
                continue;
            }
            this.prioritizedCopyIndexes.add(index);
        }
        this.handle = 0;
        this.tokenBin = null;
        this.simulateFail = false;
        this.connectTimeout = connectTimeout;
        this.recordLimit = limit;
        this.readBufferCapacity.set(this.maxReadSizeKB * 1024 * 2);
        this.readBuffer = new byte[this.readBufferCapacity.get()];
        if (restartInfo != null) {
            this.tokenBin = restartInfo.tokenBin;
            this.streamPosOfFetchStart = this.streamPos = restartInfo.streamPos;
        }
        if (!this.inFetchingMode) {
            this.makeActive();
        } else {
            AtomicBoolean blockingRequestFinished = new AtomicBoolean(false);
            Thread tempFetchThread = new Thread(() -> {
                try {
                    Long[] emptyOffsets = new Long[]{0L};
                    this.startBlockingFetchRequest(Arrays.asList(emptyOffsets));
                }
                catch (Exception e) {
                    this.prefetchException = new HpccFileException("Error while batch fetch warm starting: " + e.getMessage());
                }
                blockingRequestFinished.set(true);
            });
            tempFetchThread.start();
            while (!blockingRequestFinished.get()) {
                try {
                    long avail = this.available();
                    if (avail <= 0L) continue;
                    this.skip(avail);
                    Thread.sleep(1L);
                }
                catch (IOException e) {
                    // empty catch block
                    break;
                }
            }
            tempFetchThread.join();
            if (this.prefetchException != null) {
                throw this.prefetchException;
            }
        }
        if (createPrefetchThread) {
            final RowServiceInputStream rowInputStream = this;
            Runnable prefetchTask = new Runnable(){
                RowServiceInputStream inputStream;
                {
                    this.inputStream = rowInputStream;
                }

                @Override
                public void run() {
                    while (!this.inputStream.isClosed()) {
                        if (this.inputStream.getRemainingBufferCapacity() <= this.inputStream.bufferPrefetchThresholdKB * 1024) {
                            try {
                                Thread.sleep(1L);
                            }
                            catch (Exception exception) {
                                // empty catch block
                            }
                        }
                        this.inputStream.prefetchData();
                    }
                }
            };
            this.prefetchThread = new Thread(prefetchTask);
            this.prefetchThread.start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    RestartInformation getRestartInformationForStreamPos(long streamPos) {
        RestartInformation restartInfo = new RestartInformation();
        List<Long> list = this.streamPosOfFetches;
        synchronized (list) {
            for (int i = this.streamPosOfFetches.size() - 1; i >= 0; --i) {
                Long fetchStreamPos = this.streamPosOfFetches.get(i);
                if (fetchStreamPos > streamPos) continue;
                restartInfo.streamPos = fetchStreamPos;
                restartInfo.tokenBin = this.tokenBinOfFetches.get(i);
                break;
            }
        }
        return restartInfo;
    }

    private boolean setNextFilePartCopy() {
        if (this.filePartCopyIndexPointer + 1 >= this.prioritizedCopyIndexes.size()) {
            return false;
        }
        ++this.filePartCopyIndexPointer;
        return true;
    }

    public boolean getUseSSL() {
        return this.dataPart.getUseSsl();
    }

    public String getIP() {
        return this.dataPart.getCopyIP(this.prioritizedCopyIndexes.get(this.getFilePartCopy()));
    }

    private int getFilePartCopy() {
        return this.filePartCopyIndexPointer;
    }

    public int getPort() {
        return this.dataPart.getPort();
    }

    public String getTrans() {
        return this.makeInitialRequest();
    }

    public boolean isActive() {
        return this.active.get();
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    public int getHandle() {
        return this.handle;
    }

    public boolean setSimulateFail(boolean v) {
        boolean old = this.simulateFail;
        this.simulateFail = v;
        return old;
    }

    public void setForceTokenUse(boolean v) {
        this.forceTokenUse = v;
    }

    public void setForceHandleUse(boolean v) {
    }

    public int getRemainingBufferCapacity() {
        int totalBufferCapacity = this.readBufferCapacity.get();
        int currentBufferLen = this.readBufferDataLen.get();
        return totalBufferCapacity - currentBufferLen;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startBlockingFetchRequest(List<Long> fetchOffsets) throws Exception {
        if (!this.inFetchingMode) {
            throw new Exception("Error: attempted to start a fetch request for an input stream in sequential read mode.");
        }
        this.readPos = 0;
        this.totalDataInCurrentRequest = 0;
        this.remainingDataInCurrentRequest = 0;
        this.streamPosOfFetchStart = 0L;
        List<Long> list = this.streamPosOfFetches;
        synchronized (list) {
            this.streamPosOfFetches.clear();
            this.tokenBinOfFetches.clear();
        }
        this.fetchRequestOffsets.clear();
        this.fetchRequestOffsets.addAll(fetchOffsets);
        this.remainingDataInCurrentRequest = this.totalDataInCurrentRequest = this.startFetch();
        while (this.remainingDataInCurrentRequest > 0) {
            this.readDataInFetch();
        }
        if (this.remainingDataInCurrentRequest == 0) {
            this.finishFetch();
        }
    }

    private int startFetch() {
        block42: {
            if (this.closed.get()) {
                return -1;
            }
            ++this.numFetches;
            if (!this.active.get()) {
                try {
                    this.makeActive();
                }
                catch (HpccFileException e) {
                    this.prefetchException = e;
                    try {
                        this.close();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                    return -1;
                }
            }
            if (this.inFetchingMode) {
                if (this.simulateFail) {
                    this.handle = -1;
                }
                String readAheadRequest = this.forceTokenUse ? this.makeTokenRequest() : this.makeHandleRequest();
                try {
                    int requestLen = readAheadRequest.length();
                    this.dos.writeInt(requestLen);
                    this.dos.write(readAheadRequest.getBytes(HPCCCharSet), 0, requestLen);
                    this.dos.flush();
                }
                catch (IOException e) {
                    this.prefetchException = new HpccFileException("Failure sending read ahead transaction", e);
                    try {
                        this.close();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            }
            int len = 0;
            try {
                len = this.readReplyLen();
            }
            catch (HpccFileException e) {
                this.prefetchException = e;
                try {
                    this.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
                return -1;
            }
            if (len == 0) {
                try {
                    this.close();
                }
                catch (IOException e) {
                    this.prefetchException = new HpccFileException(e.getMessage());
                }
                return -1;
            }
            if (len < 4) {
                this.prefetchException = new HpccFileException("Early data termination, no handle");
                try {
                    this.close();
                }
                catch (Exception e) {
                    // empty catch block
                }
                return -1;
            }
            try {
                this.handle = this.dis.readInt();
                if (this.handle != 0) break block42;
                try {
                    len = this.retryWithToken();
                    log.warn("Unable to make request with handle, retyring with token.");
                }
                catch (HpccFileException e) {
                    this.prefetchException = e;
                    try {
                        this.close();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                    return -1;
                }
                if (len == 0) {
                    this.close();
                    return -1;
                }
                if (len < 4) {
                    this.prefetchException = new HpccFileException("Early data termination, no handle");
                    try {
                        this.close();
                    }
                    catch (Exception e) {
                        // empty catch block
                    }
                    return -1;
                }
                this.handle = this.dis.readInt();
                if (this.handle == 0) {
                    this.prefetchException = new HpccFileException("Read retry failed");
                    try {
                        this.close();
                    }
                    catch (Exception e) {}
                }
            }
            catch (IOException e) {
                this.prefetchException = new HpccFileException("Error during read block", e);
                try {
                    this.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }
        int dataLen = 0;
        try {
            dataLen = this.dis.readInt();
            if (!this.inFetchingMode && dataLen == 0) {
                this.close();
                return 0;
            }
        }
        catch (IOException e) {
            this.prefetchException = new HpccFileException("Error during read block", e);
            try {
                this.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        return dataLen;
    }

    private void readDataInFetch() {
        if (this.closed.get()) {
            return;
        }
        while (this.remainingDataInCurrentRequest > 0) {
            this.bufferWriteMutex.acquireUninterruptibly();
            int totalBufferCapacity = this.readBufferCapacity.get();
            int currentBufferLen = this.readBufferDataLen.get();
            int remainingBufferCapacity = totalBufferCapacity - currentBufferLen;
            remainingBufferCapacity = totalBufferCapacity - currentBufferLen;
            int bytesToRead = 0;
            try {
                bytesToRead = this.dis.available();
                bytesToRead = Math.min(remainingBufferCapacity, Math.min(bytesToRead, this.remainingDataInCurrentRequest));
                this.dis.readFully(this.readBuffer, currentBufferLen, bytesToRead);
            }
            catch (IOException e) {
                this.prefetchException = new HpccFileException("Error during read block", e);
                try {
                    this.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            this.readBufferDataLen.addAndGet(bytesToRead);
            this.remainingDataInCurrentRequest -= bytesToRead;
            this.bufferWriteMutex.release();
            if (this.readBufferDataLen.get() <= this.bufferPrefetchThresholdKB * 1024) continue;
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void finishFetch() {
        if (this.closed.get()) {
            return;
        }
        try {
            if (this.dis == null) {
                return;
            }
            int tokenLen = this.dis.readInt();
            if (tokenLen == 0) {
                this.close();
                return;
            }
            if (this.tokenBin == null || tokenLen > this.tokenBin.length) {
                this.tokenBin = new byte[tokenLen];
            }
            this.dis.readFully(this.tokenBin, 0, tokenLen);
            this.streamPosOfFetchStart += (long)this.totalDataInCurrentRequest;
            List<Long> list = this.streamPosOfFetches;
            synchronized (list) {
                this.streamPosOfFetches.add(this.streamPosOfFetchStart);
                this.tokenBinOfFetches.add(this.tokenBin);
            }
        }
        catch (IOException e) {
            this.prefetchException = new HpccFileException("Error during read block", e);
            try {
                this.close();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        if (!this.inFetchingMode) {
            if (this.simulateFail) {
                this.handle = -1;
            }
            String readAheadRequest = this.forceTokenUse ? this.makeTokenRequest() : this.makeHandleRequest();
            try {
                int requestLen = readAheadRequest.length();
                this.dos.writeInt(requestLen);
                this.dos.write(readAheadRequest.getBytes(HPCCCharSet), 0, requestLen);
                this.dos.flush();
            }
            catch (IOException e) {
                this.prefetchException = new HpccFileException("Failure sending read ahead transaction", e);
                try {
                    this.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }
    }

    public void prefetchData() {
        if (this.remainingDataInCurrentRequest > 0) {
            this.readDataInFetch();
            if (this.remainingDataInCurrentRequest == 0) {
                this.finishFetch();
            }
        } else {
            this.remainingDataInCurrentRequest = this.totalDataInCurrentRequest = this.startFetch();
            this.readDataInFetch();
            if (this.remainingDataInCurrentRequest == 0) {
                this.finishFetch();
            }
        }
    }

    private void compactBuffer() {
        this.bufferWriteMutex.acquireUninterruptibly();
        if (this.readPos >= this.bufferCompactThresholdKB * 1024) {
            int compactStartLoc = this.readPos;
            if (this.markPos >= 0) {
                int markOffset = this.readPos - this.markPos;
                if (markOffset <= this.readLimit) {
                    compactStartLoc = this.markPos;
                    this.markPos = 0;
                } else {
                    this.markPos = -1;
                }
            }
            int remainingBytesInBuffer = this.readBufferDataLen.addAndGet(-compactStartLoc);
            System.arraycopy(this.readBuffer, compactStartLoc, this.readBuffer, 0, remainingBytesInBuffer);
            this.readPos -= compactStartLoc;
        }
        this.bufferWriteMutex.release();
    }

    @Override
    public int available() throws IOException {
        int bufferLen;
        int availBytes;
        if (this.closed.get() && (availBytes = (bufferLen = this.readBufferDataLen.get()) - this.readPos) == 0) {
            throw new IOException("End of input stream.");
        }
        bufferLen = this.readBufferDataLen.get();
        availBytes = bufferLen - this.readPos;
        return availBytes;
    }

    @Override
    public void close() throws IOException {
        if (!this.closed.getAndSet(true)) {
            if (this.prefetchThread != null && Thread.currentThread() != this.prefetchThread) {
                try {
                    this.prefetchThread.join();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
            this.dos.close();
            if (this.dis != null) {
                this.dis.close();
            }
            this.sock.close();
            this.dos = null;
            this.dis = null;
            this.sock = null;
        }
    }

    @Override
    public void mark(int readLim) {
        this.bufferWriteMutex.acquireUninterruptibly();
        int availableReadCapacity = this.readBufferCapacity.get() - this.readPos;
        this.readLimit = readLim;
        if (availableReadCapacity <= this.readLimit) {
            int requiredBufferLength = this.readPos + this.readLimit;
            int remainingBytesInBuffer = this.readBufferDataLen.addAndGet(-this.readPos);
            if (this.readBufferCapacity.get() >= requiredBufferLength) {
                System.arraycopy(this.readBuffer, this.readPos, this.readBuffer, 0, remainingBytesInBuffer);
                this.readPos = 0;
            } else {
                byte[] newBuffer = new byte[requiredBufferLength];
                System.arraycopy(this.readBuffer, this.readPos, newBuffer, 0, remainingBytesInBuffer);
                this.readBuffer = newBuffer;
                this.readBufferCapacity.set(requiredBufferLength);
                this.readPos = 0;
            }
        }
        this.markPos = this.readPos;
        this.bufferWriteMutex.release();
    }

    @Override
    public boolean markSupported() {
        return true;
    }

    @Override
    public int read() throws IOException {
        if (this.prefetchException != null) {
            throw new IOException(this.prefetchException.getMessage());
        }
        long waitNS = 0L;
        try {
            while (this.available() < 1) {
            }
        }
        catch (IOException e) {
            return -1;
        }
        int ret = this.readBuffer[this.readPos] + 128;
        ++this.readPos;
        ++this.streamPos;
        this.compactBuffer();
        return ret;
    }

    @Override
    public int read(byte[] b) throws IOException {
        return this.read(b, 0, b.length);
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
        if (this.prefetchException != null) {
            throw new IOException(this.prefetchException.getMessage());
        }
        int available = 0;
        try {
            available = this.available();
        }
        catch (IOException e) {
            return -1;
        }
        int bytesToRead = len;
        if (bytesToRead > available) {
            bytesToRead = available;
            ++this.numPartialBlockReads;
        }
        ++this.numBlockReads;
        System.arraycopy(this.readBuffer, this.readPos, b, off, bytesToRead);
        this.readPos += bytesToRead;
        this.streamPos += (long)bytesToRead;
        this.compactBuffer();
        return bytesToRead;
    }

    @Override
    public void reset() throws IOException {
        if (this.prefetchException != null) {
            throw new IOException(this.prefetchException.getMessage());
        }
        if (this.markPos < 0) {
            throw new IOException("Unable to reset to marked position. Either a mark has not been set or the reset length exceeds internal buffer length.");
        }
        this.streamPos -= (long)(this.readPos - this.markPos);
        this.readPos = this.markPos;
        this.markPos = -1;
    }

    @Override
    public long skip(long n) throws IOException {
        long remainingBytesToSkip;
        int bytesToSkip;
        if (this.prefetchException != null) {
            throw new IOException(this.prefetchException.getMessage());
        }
        for (remainingBytesToSkip = n; remainingBytesToSkip > 0L; remainingBytesToSkip -= (long)bytesToSkip) {
            int available = 0;
            try {
                available = this.available();
            }
            catch (IOException e) {
                break;
            }
            bytesToSkip = (int)remainingBytesToSkip;
            if (bytesToSkip > available) {
                bytesToSkip = available;
            }
            this.readPos += bytesToSkip;
            this.compactBuffer();
        }
        long bytesSkipped = n - remainingBytesToSkip;
        this.streamPos += bytesSkipped;
        return bytesSkipped;
    }

    @Override
    public List<IMetric> getMetrics() {
        ArrayList<IMetric> metrics = new ArrayList<IMetric>();
        metrics.add(new SimpleMetric(this.streamPos, BYTES_READ_METRIC, new Units(Units.Type.BYTES)));
        metrics.add(new SimpleMetric(this.firstByteTimeNS, FIRST_BYTE_TIME_METRIC, new Units(Units.Type.SECONDS, Units.Scale.NANO)));
        metrics.add(new SimpleMetric(this.waitTimeNS, WAIT_TIME_METRIC, new Units(Units.Type.SECONDS, Units.Scale.NANO)));
        metrics.add(new SimpleMetric(this.sleepTimeNS, SLEEP_TIME_METRIC, new Units(Units.Type.SECONDS, Units.Scale.NANO)));
        metrics.add(new SimpleMetric(this.fetchStartTimeNS, FETCH_START_TIME_METRIC, new Units(Units.Type.SECONDS, Units.Scale.NANO)));
        metrics.add(new SimpleMetric(this.fetchTimeNS, FETCH_TIME_METRIC, new Units(Units.Type.SECONDS, Units.Scale.NANO)));
        metrics.add(new SimpleMetric(this.fetchFinishTimeNS, FETCH_FINISH_TIME_METRIC, new Units(Units.Type.SECONDS, Units.Scale.NANO)));
        metrics.add(new SimpleMetric(this.closeTimeNS, CLOSE_TIME_METRIC, new Units(Units.Type.SECONDS, Units.Scale.NANO)));
        metrics.add(new SimpleMetric(this.mutexWaitTimeNS, MUTEX_WAIT_TIME_METRIC, new Units(Units.Type.SECONDS, Units.Scale.NANO)));
        metrics.add(new SimpleMetric(this.numLongWaits, LONG_WAITS_METRIC, new Units(Units.Type.COUNT)));
        metrics.add(new SimpleMetric(this.numFetches, FETCHES_METRIC, new Units(Units.Type.COUNT)));
        metrics.add(new SimpleMetric(this.numPartialBlockReads, PARTIAL_BLOCK_READS_METRIC, new Units(Units.Type.COUNT)));
        metrics.add(new SimpleMetric(this.numBlockReads, BLOCK_READS_METRIC, new Units(Units.Type.COUNT)));
        return metrics;
    }

    private void makeActive() throws HpccFileException {
        this.active.set(false);
        this.handle = 0;
        while (true) {
            try {
                log.debug("Attempting to connect to file part : '" + this.dataPart.getThisPart() + "' Copy: '" + (this.getFilePartCopy() + 1) + "' on IP: '" + this.getIP() + "'");
                try {
                    if (this.getUseSSL()) {
                        SSLSocketFactory ssf = (SSLSocketFactory)SSLSocketFactory.getDefault();
                        this.sock = (SSLSocket)ssf.createSocket();
                        this.sock.setPerformancePreferences(0, 1, 2);
                        this.sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);
                        log.debug("Attempting SSL handshake...");
                        ((SSLSocket)this.sock).startHandshake();
                        log.debug("SSL handshake successful...");
                        log.debug("   Remote address = " + this.sock.getInetAddress().toString() + " Remote port = " + this.sock.getPort());
                    } else {
                        SocketFactory sf = SocketFactory.getDefault();
                        this.sock = sf.createSocket();
                        this.sock.setPerformancePreferences(0, 1, 2);
                        this.sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);
                    }
                    log.debug("Connected: Remote address = " + this.sock.getInetAddress().toString() + " Remote port = " + this.sock.getPort());
                }
                catch (UnknownHostException e) {
                    throw new HpccFileException("Bad file part addr " + this.getIP(), e);
                }
                catch (IOException e) {
                    throw new HpccFileException(e);
                }
                try {
                    this.dos = new DataOutputStream(this.sock.getOutputStream());
                    this.dis = new DataInputStream(this.sock.getInputStream());
                }
                catch (IOException e) {
                    throw new HpccFileException("Failed to create streams", e);
                }
                this.active.set(true);
                try {
                    String readTrans = null;
                    if (this.tokenBin == null) {
                        this.tokenBin = new byte[0];
                        readTrans = this.makeInitialRequest();
                    } else {
                        readTrans = this.makeTokenRequest();
                    }
                    int transLen = readTrans.length();
                    this.dos.writeInt(transLen);
                    this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen);
                    this.dos.flush();
                }
                catch (IOException e) {
                    throw new HpccFileException("Failed on initial remote read read trans", e);
                }
                return;
            }
            catch (Exception e) {
                log.error("Could not reach file part: '" + this.dataPart.getThisPart() + "' copy: '" + (this.getFilePartCopy() + 1) + "' on IP: '" + this.getIP() + "'");
                log.error(e.getMessage());
                if (this.setNextFilePartCopy()) continue;
                throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e);
            }
            break;
        }
    }

    private void makeFetchObject(StringBuilder sb) {
        if (this.inFetchingMode) {
            sb.append("\"fetch\" : {\n");
            for (int i = 0; i < this.fetchRequestOffsets.size(); ++i) {
                sb.append("\"fpos\" : " + this.fetchRequestOffsets.get(i));
                if (i != this.fetchRequestOffsets.size()) {
                    sb.append(",\n");
                    continue;
                }
                sb.append("\n");
            }
            sb.append("},\n");
        }
    }

    private String makeInitialRequest() {
        StringBuilder sb = new StringBuilder(2048);
        sb.append('+');
        sb.append("{ \"format\" : \"binary\", \n");
        sb.append("\"replyLimit\" : " + this.maxReadSizeKB + ",\n");
        if (this.inFetchingMode) {
            this.makeFetchObject(sb);
        }
        sb.append(this.makeNodeObject());
        sb.append("\n}\n");
        return sb.toString();
    }

    private String makeNodeObject() {
        boolean needToSpecifyFileType;
        StringBuilder sb = new StringBuilder(50 + this.jsonRecordDefinition.length() + this.projectedJsonRecordDefinition.length());
        sb.append(" \"node\" : {\n ");
        DataPartition.FileType fileType = this.dataPart.getFileType();
        boolean bl = needToSpecifyFileType = !fileType.typeCanBeDeduced();
        if (needToSpecifyFileType) {
            sb.append("\"kind\" : \"");
            sb.append(fileType.toString() + "read\",\n");
        }
        sb.append("\"metaInfo\" : \"");
        sb.append(this.dataPart.getFileAccessBlob());
        sb.append("\",\n \"filePart\" : \"");
        sb.append(this.dataPart.getThisPart());
        sb.append("\", \n");
        sb.append("\"filePartCopy\" : \"");
        sb.append(this.getFilePartCopy() + 1);
        sb.append("\", \n");
        if (!this.inFetchingMode) {
            FileFilter fileFilter = this.dataPart.getFilter();
            if (fileFilter != null && !fileFilter.isEmpty()) {
                sb.append(" ");
                sb.append(this.dataPart.getFilter().toJson());
                sb.append(",\n");
            }
            if (this.recordLimit > -1) {
                sb.append("\"chooseN\" : \"" + this.recordLimit + "\",\n");
            }
        }
        sb.append("\n \"input\" : ");
        sb.append(this.jsonRecordDefinition);
        sb.append(", \n \"output\" : ");
        sb.append(this.projectedJsonRecordDefinition);
        sb.append("\n }");
        return sb.toString();
    }

    private String makeHandleRequest() {
        StringBuilder sb = new StringBuilder(100);
        sb.append('+');
        sb.append("{ \"format\" : \"binary\",\n");
        sb.append("  \"handle\" : \"" + Integer.toString(this.handle) + "\",");
        if (this.inFetchingMode) {
            this.makeFetchObject(sb);
        }
        sb.append("\n}");
        return sb.toString();
    }

    private String makeTokenRequest() {
        StringBuilder sb = new StringBuilder(130 + this.jsonRecordDefinition.length() + this.projectedJsonRecordDefinition.length() + (int)((double)this.tokenBin.length * 1.4));
        sb.append('+');
        sb.append("{ \"format\" : \"binary\",\n");
        sb.append("\"replyLimit\" : " + this.maxReadSizeKB + ",\n");
        sb.append(this.makeNodeObject());
        sb.append(",\n");
        sb.append("  \"cursorBin\" : \"");
        sb.append(Base64.getEncoder().encodeToString(this.tokenBin));
        sb.append("\" \n}\n");
        return sb.toString();
    }

    private int readReplyLen() throws HpccFileException {
        int len = 0;
        boolean hi_flag = false;
        try {
            len = this.dis.readInt();
            if (len < 0) {
                hi_flag = true;
                len &= Integer.MAX_VALUE;
            }
            if (len == 0) {
                return 0;
            }
            int status = this.dis.readInt();
            len -= 4;
            if (status != 0) {
                StringBuilder sb = new StringBuilder();
                sb.append("\nReceived ERROR from Thor node (");
                sb.append(this.getIP());
                sb.append("): Code: '");
                sb.append(status);
                sb.append("'");
                if (len > 0) {
                    byte[] message = new byte[len];
                    this.dis.readFully(message, 0, len);
                    sb.append(" Message: '");
                    sb.append(new String(message));
                    sb.append("'");
                }
                switch (status) {
                    case -6: {
                        sb.append("\nInvalid file access expiry reported - change File Access Expiry (HPCCFile) and retry");
                        break;
                    }
                    case -7: {
                        sb.append("\nFile access expired before initial request - Retry and consider increasing File Access Expiry (HPCCFile)");
                        break;
                    }
                }
                throw new HpccFileException(sb.toString());
            }
        }
        catch (IOException e) {
            throw new HpccFileException("Error during read block", e);
        }
        return len;
    }

    private int retryWithToken() throws HpccFileException {
        String retryTrans = this.makeTokenRequest();
        int len = retryTrans.length();
        try {
            this.dos.writeInt(len);
            this.dos.write(retryTrans.getBytes(HPCCCharSet), 0, len);
            this.dos.flush();
        }
        catch (IOException e) {
            throw new HpccFileException("Failed on remote read read retry", e);
        }
        return this.readReplyLen();
    }

    public static class RestartInformation {
        public long streamPos = 0L;
        public byte[] tokenBin = null;
    }
}

