/*
 * Decompiled with CFR 0.152.
 */
package com.c8db.internal.velocystream.internal;

import com.arangodb.velocypack.VPackSlice;
import com.c8db.C8DBException;
import com.c8db.Service;
import com.c8db.internal.C8Defaults;
import com.c8db.internal.net.Connection;
import com.c8db.internal.net.HostDescription;
import com.c8db.internal.velocystream.internal.Chunk;
import com.c8db.internal.velocystream.internal.ChunkStore;
import com.c8db.internal.velocystream.internal.Message;
import com.c8db.internal.velocystream.internal.MessageStore;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class VstConnection
implements Connection {
    private static final Logger LOGGER = LoggerFactory.getLogger(VstConnection.class);
    private static final byte[] PROTOCOL_HEADER = "VST/1.0\r\n\r\n".getBytes();
    private ExecutorService executor;
    protected final MessageStore messageStore;
    protected final Integer timeout;
    private final Long ttl;
    private final Boolean useSsl;
    private final SSLContext sslContext;
    private Socket socket;
    private OutputStream outputStream;
    private InputStream inputStream;
    private final HostDescription host;
    private final Service service;
    private HashMap<Long, Long> sendTimestamps = new HashMap();
    private String connectionName;

    protected VstConnection(HostDescription host, Integer timeout, Long ttl, Boolean useSsl, SSLContext sslContext, MessageStore messageStore, Service service) {
        this.host = host;
        this.timeout = timeout;
        this.ttl = ttl;
        this.useSsl = useSsl;
        this.sslContext = sslContext;
        this.messageStore = messageStore;
        this.service = service;
        this.connectionName = "conenction_" + System.currentTimeMillis() + "_" + Math.random();
        LOGGER.debug("Connection " + this.connectionName + " created");
    }

    public boolean isOpen() {
        return this.socket != null && this.socket.isConnected() && !this.socket.isClosed();
    }

    public synchronized void open() throws IOException {
        if (this.isOpen()) {
            return;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Open connection to %s", this.host));
        }
        this.socket = Boolean.TRUE == this.useSsl ? (this.sslContext != null ? this.sslContext.getSocketFactory().createSocket() : SSLSocketFactory.getDefault().createSocket()) : SocketFactory.getDefault().createSocket();
        this.socket.connect(new InetSocketAddress(this.host.getHost(), this.host.getPort()), this.timeout != null ? this.timeout : C8Defaults.DEFAULT_TIMEOUT);
        this.socket.setKeepAlive(true);
        this.socket.setTcpNoDelay(true);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Connected to %s", this.socket));
        }
        this.outputStream = new BufferedOutputStream(this.socket.getOutputStream());
        this.inputStream = this.socket.getInputStream();
        if (Boolean.TRUE == this.useSsl) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Start Handshake on %s", this.socket));
            }
            ((SSLSocket)this.socket).startHandshake();
        }
        this.sendProtocolHeader();
        this.executor = Executors.newSingleThreadExecutor();
        this.executor.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                LOGGER.debug("Start Callable for " + VstConnection.this.connectionName);
                long openTime = new Date().getTime();
                Long ttlTime = VstConnection.this.ttl != null ? Long.valueOf(openTime + VstConnection.this.ttl) : null;
                ChunkStore chunkStore = new ChunkStore(VstConnection.this.messageStore);
                while (true) {
                    if (ttlTime != null && new Date().getTime() > ttlTime && VstConnection.this.messageStore.isEmpty()) {
                        VstConnection.this.close();
                        break;
                    }
                    if (!VstConnection.this.isOpen()) {
                        VstConnection.this.messageStore.clear(new IOException("The socket is closed."));
                        VstConnection.this.close();
                        break;
                    }
                    try {
                        Chunk chunk = VstConnection.this.readChunk();
                        ByteBuffer chunkBuffer = chunkStore.storeChunk(chunk);
                        if (chunkBuffer == null) continue;
                        byte[] buf = new byte[chunk.getContentLength()];
                        VstConnection.this.readBytesIntoBuffer(buf, 0, buf.length);
                        chunkBuffer.put(buf);
                        chunkStore.checkCompleteness(chunk.getMessageId());
                    }
                    catch (Exception e) {
                        VstConnection.this.messageStore.clear(e);
                        VstConnection.this.close();
                        break;
                    }
                }
                LOGGER.debug("Stop Callable for " + VstConnection.this.connectionName);
                return null;
            }
        });
    }

    @Override
    public synchronized void close() {
        this.messageStore.clear();
        if (this.executor != null && !this.executor.isShutdown()) {
            this.executor.shutdown();
        }
        if (this.socket != null && !this.socket.isClosed()) {
            try {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(String.format("Close connection %s", this.socket));
                }
                this.socket.close();
            }
            catch (IOException e) {
                throw new C8DBException(e);
            }
        }
    }

    private synchronized void sendProtocolHeader() throws IOException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Send velocystream protocol header to %s", this.socket));
        }
        this.outputStream.write(PROTOCOL_HEADER);
        this.outputStream.flush();
    }

    protected synchronized void writeIntern(Message message, Collection<Chunk> chunks) throws C8DBException {
        for (Chunk chunk : chunks) {
            try {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(String.format("Send chunk %s:%s from message %s", chunk.getChunk(), chunk.isFirstChunk() ? 1 : 0, chunk.getMessageId()));
                    this.sendTimestamps.put(chunk.getMessageId(), System.currentTimeMillis());
                }
                this.writeChunkHead(chunk);
                int contentOffset = chunk.getContentOffset();
                int contentLength = chunk.getContentLength();
                VPackSlice head = message.getHead();
                int headLength = head.getByteSize();
                int written = 0;
                if (contentOffset < headLength) {
                    written = Math.min(contentLength, headLength - contentOffset);
                    this.outputStream.write(head.getBuffer(), contentOffset, written);
                }
                if (written < contentLength) {
                    VPackSlice body = message.getBody();
                    this.outputStream.write(body.getBuffer(), contentOffset + written - headLength, contentLength - written);
                }
                this.outputStream.flush();
            }
            catch (IOException e) {
                LOGGER.error("Error on Connection " + this.connectionName);
                throw new C8DBException(e);
            }
        }
    }

    private synchronized void writeChunkHead(Chunk chunk) throws IOException {
        long messageLength = chunk.getMessageLength();
        int headLength = messageLength > -1L ? 24 : 16;
        int length = chunk.getContentLength() + headLength;
        ByteBuffer buffer = ByteBuffer.allocate(headLength).order(ByteOrder.LITTLE_ENDIAN);
        buffer.putInt(length);
        buffer.putInt(chunk.getChunkX());
        buffer.putLong(chunk.getMessageId());
        if (messageLength > -1L) {
            buffer.putLong(messageLength);
        }
        this.outputStream.write(buffer.array());
    }

    protected Chunk readChunk() throws IOException {
        int contentLength;
        long messageLength;
        ByteBuffer chunkHeadBuffer = this.readBytes(16);
        int length = chunkHeadBuffer.getInt();
        int chunkX = chunkHeadBuffer.getInt();
        long messageId = chunkHeadBuffer.getLong();
        if (1 == (chunkX & 1) && chunkX >> 1 > 1) {
            messageLength = this.readBytes(8).getLong();
            contentLength = length - 24;
        } else {
            messageLength = -1L;
            contentLength = length - 16;
        }
        Chunk chunk = new Chunk(messageId, chunkX, messageLength, 0, contentLength);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("Received chunk %s:%s from message %s", chunk.getChunk(), chunk.isFirstChunk() ? 1 : 0, chunk.getMessageId()));
            LOGGER.debug("Responsetime for Message " + chunk.getMessageId() + " is " + (this.sendTimestamps.get(chunk.getMessageId()) - System.currentTimeMillis()));
        }
        return chunk;
    }

    private ByteBuffer readBytes(int len) throws IOException {
        byte[] buf = new byte[len];
        this.readBytesIntoBuffer(buf, 0, len);
        return ByteBuffer.wrap(buf).order(ByteOrder.LITTLE_ENDIAN);
    }

    protected void readBytesIntoBuffer(byte[] buf, int off, int len) throws IOException {
        int read;
        for (int readed = 0; readed < len; readed += read) {
            read = this.inputStream.read(buf, off + readed, len - readed);
            if (read != -1) continue;
            throw new IOException("Reached the end of the stream.");
        }
    }

    public String getConnectionName() {
        return this.connectionName;
    }
}

