/*
 * Decompiled with CFR 0.152.
 */
package io.thundra.merloc.broker.client.impl;

import io.thundra.merloc.broker.client.BrokerClient;
import io.thundra.merloc.broker.client.BrokerCredentials;
import io.thundra.merloc.broker.client.BrokerEnvelope;
import io.thundra.merloc.broker.client.BrokerMessage;
import io.thundra.merloc.broker.client.BrokerMessageCallback;
import io.thundra.merloc.broker.client.BrokerPayload;
import io.thundra.merloc.common.logger.StdLogger;
import io.thundra.merloc.common.utils.ExceptionUtils;
import io.thundra.merloc.common.utils.ExecutorUtils;
import io.thundra.merloc.common.utils.StringUtils;
import io.thundra.merloc.thirdparty.com.fasterxml.jackson.databind.DeserializationFeature;
import io.thundra.merloc.thirdparty.com.fasterxml.jackson.databind.ObjectMapper;
import io.thundra.merloc.thirdparty.com.fasterxml.jackson.databind.SerializationFeature;
import io.thundra.merloc.thirdparty.okhttp3.Dispatcher;
import io.thundra.merloc.thirdparty.okhttp3.OkHttpClient;
import io.thundra.merloc.thirdparty.okhttp3.Request;
import io.thundra.merloc.thirdparty.okhttp3.Response;
import io.thundra.merloc.thirdparty.okhttp3.WebSocket;
import io.thundra.merloc.thirdparty.okhttp3.WebSocketListener;
import io.thundra.merloc.thirdparty.okio.ByteString;
import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public final class OkHttpWebSocketBrokerClient
extends WebSocketListener
implements BrokerClient {
    private static final String API_KEY_HEADER_NAME = "x-api-key";
    private static final int MAX_FRAME_SIZE = 16384;
    private static final int NORMAL_CLOSE_CODE = 1000;
    private static final OkHttpClient baseClient = new OkHttpClient.Builder().dispatcher(new Dispatcher(ExecutorUtils.newCachedExecutorService("broker-client-okhttp-dispatcher", false))).readTimeout(3L, TimeUnit.SECONDS).pingInterval(30L, TimeUnit.SECONDS).build();
    private final ObjectMapper objectMapper = new ObjectMapper().configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false).configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    private final ScheduledExecutorService inFlightMessageCleanerExecutorService = ExecutorUtils.newScheduledExecutorService(1, "broker-client-inflight-cleaner");
    private final ScheduledExecutorService idleEnvelopeCleanerExecutorService = ExecutorUtils.newScheduledExecutorService(1, "broker-client-envelope-cleaner");
    private final Map<String, InFlightMessage> messageMap = new ConcurrentHashMap<String, InFlightMessage>();
    private final EnvelopeGlue envelopeGlue = new EnvelopeGlue();
    private final OkHttpClient client;
    private final WebSocket webSocket;
    private final BrokerMessageCallback messageCallback;
    private final CompletableFuture<Boolean> connectedFuture;
    private final CompletableFuture<Boolean> closedFuture;

    public OkHttpWebSocketBrokerClient(String url, BrokerCredentials brokerCredentials, BrokerMessageCallback messageCallback) {
        this(url, brokerCredentials, messageCallback, null, null, null);
    }

    public OkHttpWebSocketBrokerClient(String url, BrokerCredentials brokerCredentials, BrokerMessageCallback messageCallback, Map<String, String> headers, CompletableFuture connectedFuture, CompletableFuture closedFuture) {
        this.messageCallback = messageCallback;
        this.connectedFuture = connectedFuture == null ? new CompletableFuture() : connectedFuture;
        this.closedFuture = closedFuture == null ? new CompletableFuture() : closedFuture;
        Request request = OkHttpWebSocketBrokerClient.buildRequest(url, brokerCredentials, headers);
        this.client = baseClient.newBuilder().build();
        this.webSocket = this.client.newWebSocket(request, this);
        this.idleEnvelopeCleanerExecutorService.scheduleAtFixedRate(() -> this.envelopeGlue.cleanIdleEnvelopes(), 1L, 1L, TimeUnit.MINUTES);
    }

    private static Request buildRequest(String url, BrokerCredentials brokerCredentials, Map<String, String> headers) {
        Request.Builder builder = new Request.Builder();
        url = OkHttpWebSocketBrokerClient.normalizeBrokerUrl(url);
        builder.url(url);
        if (brokerCredentials.getConnectionName() != null) {
            builder.header(API_KEY_HEADER_NAME, brokerCredentials.getConnectionName());
        }
        if (headers != null) {
            for (Map.Entry<String, String> e : headers.entrySet()) {
                String headerName = e.getKey();
                String headerValue = e.getValue();
                builder.header(headerName, headerValue);
            }
        }
        return builder.build();
    }

    private static String normalizeBrokerUrl(String url) {
        if (url.startsWith("ws://") || url.startsWith("wss://")) {
            return url;
        }
        return "wss://" + url;
    }

    @Override
    public boolean isConnected() {
        return this.connectedFuture.isDone() && !this.connectedFuture.isCompletedExceptionally();
    }

    @Override
    public boolean waitUntilConnected() {
        try {
            return this.connectedFuture.get();
        }
        catch (InterruptedException | ExecutionException e) {
            return false;
        }
    }

    @Override
    public boolean waitUntilConnected(long timeout2, TimeUnit unit) {
        try {
            return this.connectedFuture.get(timeout2, unit);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            return false;
        }
    }

    private void doSend(BrokerMessage message, String payloadStr) throws IOException {
        int payloadLength = payloadStr.length();
        if (payloadLength < 16384) {
            BrokerEnvelope envelope = new BrokerEnvelope().withId(message.getId()).withResponseOf(message.getResponseOf()).withConnectionName(message.getConnectionName()).withSourceConnectionId(message.getSourceConnectionId()).withSourceConnectionType(message.getSourceConnectionType()).withTargetConnectionId(message.getTargetConnectionId()).withTargetConnectionType(message.getTargetConnectionType()).withType(message.getType()).withPayload(payloadStr);
            String envelopeStr = this.objectMapper.writeValueAsString(envelope);
            if (!this.webSocket.send(envelopeStr)) {
                throw new IOException("Unable to send message");
            }
        } else {
            int fragmentCount = payloadLength / 16384 + (payloadLength % 16384 == 0 ? 0 : 1);
            for (int i = 0; i < fragmentCount; ++i) {
                String fragmentedPayload = payloadStr.substring(i * 16384, Math.min((i + 1) * 16384, payloadLength));
                BrokerEnvelope envelope = new BrokerEnvelope().withId(message.getId()).withResponseOf(message.getResponseOf()).withConnectionName(message.getConnectionName()).withSourceConnectionId(message.getSourceConnectionId()).withSourceConnectionType(message.getSourceConnectionType()).withTargetConnectionId(message.getTargetConnectionId()).withTargetConnectionType(message.getTargetConnectionType()).withType(message.getType()).withPayload(fragmentedPayload).withFragmented(true).withFragmentNo(i).withFragmentCount(fragmentCount);
                String envelopeStr = this.objectMapper.writeValueAsString(envelope);
                if (this.webSocket.send(envelopeStr)) continue;
                throw new IOException("Unable to send message");
            }
        }
    }

    @Override
    public void send(BrokerMessage message) throws IOException {
        if (StringUtils.isNullOrEmpty(message.getId())) {
            message.setId(UUID.randomUUID().toString());
        }
        BrokerPayload payload = new BrokerPayload().withData(message.getData()).withError(message.getError());
        String payloadStr = this.objectMapper.writeValueAsString(payload);
        this.doSend(message, payloadStr);
    }

    @Override
    public BrokerMessage sendAndGetResponse(BrokerMessage message, long timeout2, TimeUnit timeUnit) throws IOException {
        if (StringUtils.isNullOrEmpty(message.getId())) {
            message.setId(UUID.randomUUID().toString());
        }
        CompletableFuture responseFuture = new CompletableFuture();
        ScheduledFuture<?> scheduledFuture = this.inFlightMessageCleanerExecutorService.schedule(() -> {
            InFlightMessage inFlightMessage = this.messageMap.remove(message.getId());
            if (inFlightMessage != null && inFlightMessage.completableFuture != null) {
                inFlightMessage.completableFuture.completeExceptionally(new TimeoutException(String.format("Message with id %s has timed-out", message.getId())));
            }
        }, timeout2, timeUnit);
        this.messageMap.put(message.getId(), new InFlightMessage(responseFuture, scheduledFuture));
        try {
            this.send(message);
        }
        catch (Throwable t) {
            this.messageMap.remove(message.getId());
            scheduledFuture.cancel(true);
            ExceptionUtils.sneakyThrow(t);
        }
        try {
            return (BrokerMessage)responseFuture.get(timeout2, timeUnit);
        }
        catch (Throwable t) {
            if (t instanceof ExecutionException) {
                t = t.getCause();
            }
            StdLogger.error("Unable to get response", t);
            return null;
        }
    }

    @Override
    public void sendCloseMessage(int code, String reason) throws IOException {
        if (!this.webSocket.close(code, reason)) {
            throw new IOException("Unable to send message");
        }
    }

    @Override
    public void close() {
        try {
            this.webSocket.close(1000, null);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Override
    public void destroy() {
        try {
            this.webSocket.cancel();
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.inFlightMessageCleanerExecutorService.shutdownNow();
        this.idleEnvelopeCleanerExecutorService.shutdownNow();
    }

    @Override
    public boolean isClosed() {
        return this.closedFuture.isDone() && !this.closedFuture.isCompletedExceptionally();
    }

    @Override
    public boolean waitUntilClosed() {
        try {
            return this.closedFuture.get();
        }
        catch (InterruptedException | ExecutionException e) {
            return false;
        }
    }

    @Override
    public boolean waitUntilClosed(long timeout2, TimeUnit unit) {
        try {
            return this.closedFuture.get(timeout2, unit);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            return false;
        }
    }

    @Override
    public void onOpen(WebSocket webSocket, Response response) {
        StdLogger.debug("OPEN: " + response.message());
        this.connectedFuture.complete(true);
    }

    @Override
    public void onMessage(WebSocket webSocket, String text) {
        if (StdLogger.DEBUG_ENABLED) {
            StdLogger.debug("MESSAGE: " + text);
        }
        this.receiveMessage(text);
    }

    @Override
    public void onMessage(WebSocket webSocket, ByteString bytes) {
        String text = bytes.utf8();
        if (StdLogger.DEBUG_ENABLED) {
            StdLogger.debug("MESSAGE: " + text);
        }
        this.receiveMessage(text);
    }

    private void receiveMessage(String text) {
        try {
            BrokerEnvelope envelope = this.objectMapper.readValue(text, BrokerEnvelope.class);
            String payloadStr = envelope.getPayload();
            if (StringUtils.isNullOrEmpty(payloadStr)) {
                StdLogger.error("Empty payload in envelope");
                return;
            }
            if (envelope.isFragmented()) {
                this.envelopeGlue.glue(envelope);
            } else {
                BrokerPayload payload = this.objectMapper.readValue(payloadStr, BrokerPayload.class);
                if (payload == null) {
                    StdLogger.error("Empty payload in envelope");
                    return;
                }
                BrokerMessage message = new BrokerMessage().withId(envelope.getId()).withResponseOf(envelope.getResponseOf()).withConnectionName(envelope.getConnectionName()).withSourceConnectionId(envelope.getSourceConnectionId()).withSourceConnectionType(envelope.getSourceConnectionType()).withTargetConnectionId(envelope.getTargetConnectionId()).withTargetConnectionType(envelope.getTargetConnectionType()).withType(envelope.getType()).withData(payload.getData()).withError(payload.getError());
                this.handleMessage(message);
            }
        }
        catch (Throwable error) {
            StdLogger.error(String.format("Unable to deserialize broker message: %s", text), error);
        }
    }

    private void handleMessage(BrokerMessage message) {
        try {
            InFlightMessage inFlightMessage;
            if (StringUtils.hasValue(message.getResponseOf()) && (inFlightMessage = this.messageMap.remove(message.getResponseOf())) != null) {
                if (inFlightMessage.scheduledFuture != null) {
                    inFlightMessage.scheduledFuture.cancel(true);
                }
                if (inFlightMessage.completableFuture != null) {
                    inFlightMessage.completableFuture.complete(message);
                }
            }
            if (this.messageCallback != null) {
                this.messageCallback.onMessage(this, message);
            }
        }
        catch (Throwable error) {
            StdLogger.error(String.format("Unable to handle broker message: %s", message), error);
        }
    }

    @Override
    public void onClosing(WebSocket webSocket, int code, String reason) {
        StdLogger.debug("CLOSING: " + code + " " + reason);
        webSocket.close(1000, null);
    }

    @Override
    public void onClosed(WebSocket webSocket, int code, String reason) {
        StdLogger.debug("CLOSED: " + code + " " + reason);
        this.closedFuture.complete(true);
        this.destroyInFlightMessages(code, reason);
    }

    @Override
    public void onFailure(WebSocket webSocket, Throwable t, Response response) {
        StdLogger.error("FAILED: ", t);
        try {
            if (response != null && response.body() != null) {
                StdLogger.error(response.body().string());
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        if (!this.isConnected()) {
            this.connectedFuture.completeExceptionally(t);
        }
        this.closedFuture.completeExceptionally(t);
        this.destroyInFlightMessages(-1, t.getMessage());
    }

    private void destroyInFlightMessages(int code, String reason) {
        Iterator<InFlightMessage> iter = this.messageMap.values().iterator();
        while (iter.hasNext()) {
            InFlightMessage inFlightMessage = iter.next();
            iter.remove();
            if (inFlightMessage.scheduledFuture != null) {
                inFlightMessage.scheduledFuture.cancel(true);
            }
            if (inFlightMessage.completableFuture == null) continue;
            if (code == 1000) {
                inFlightMessage.completableFuture.complete(null);
                continue;
            }
            inFlightMessage.completableFuture.completeExceptionally(new IOException(String.format("Connection is closed (code=%d, reason=%s)", code, reason)));
        }
    }

    private class EnvelopeGlue {
        private final long ENVELOPE_IDLE_TIMEOUT = TimeUnit.MINUTES.toMillis(1L);
        private final Map<BrokerEnvelopeKey, Set<BrokerEnvelope>> envelopeMap = new ConcurrentHashMap<BrokerEnvelopeKey, Set<BrokerEnvelope>>();

        private EnvelopeGlue() {
        }

        private void glueSiblingEnvelopesAndHandleMessage(Set<BrokerEnvelope> siblingEnvelopes) {
            BrokerEnvelope firstEnvelope = siblingEnvelopes.iterator().next();
            StringBuilder payloadBuilder = new StringBuilder();
            for (BrokerEnvelope envelope : siblingEnvelopes) {
                payloadBuilder.append(envelope.getPayload());
            }
            String payloadStr = payloadBuilder.toString();
            try {
                BrokerPayload payload = OkHttpWebSocketBrokerClient.this.objectMapper.readValue(payloadStr, BrokerPayload.class);
                BrokerMessage message = new BrokerMessage().withId(firstEnvelope.getId()).withResponseOf(firstEnvelope.getResponseOf()).withConnectionName(firstEnvelope.getConnectionName()).withSourceConnectionId(firstEnvelope.getSourceConnectionId()).withSourceConnectionType(firstEnvelope.getSourceConnectionType()).withTargetConnectionId(firstEnvelope.getTargetConnectionId()).withTargetConnectionType(firstEnvelope.getTargetConnectionType()).withType(firstEnvelope.getType()).withData(payload.getData()).withError(payload.getError());
                OkHttpWebSocketBrokerClient.this.handleMessage(message);
            }
            catch (Throwable t) {
                StdLogger.error(String.format("Unable to deserialize broker message from glued data: %s", payloadStr), t);
            }
        }

        private void glue(BrokerEnvelope envelope) {
            Set<BrokerEnvelope> existingSiblingEnvelopes;
            String id = envelope.getId();
            int fragmentCount = envelope.getFragmentCount();
            BrokerEnvelopeKey key = new BrokerEnvelopeKey(id, System.currentTimeMillis());
            Set<BrokerEnvelope> siblingEnvelopes = this.envelopeMap.get(key);
            if (siblingEnvelopes == null && (existingSiblingEnvelopes = this.envelopeMap.putIfAbsent(key, siblingEnvelopes = new ConcurrentSkipListSet<BrokerEnvelope>(Comparator.comparingInt(BrokerEnvelope::getFragmentNo)))) != null) {
                siblingEnvelopes = existingSiblingEnvelopes;
            }
            siblingEnvelopes.add(envelope);
            if (siblingEnvelopes.size() == fragmentCount) {
                this.envelopeMap.remove(key);
                this.glueSiblingEnvelopesAndHandleMessage(siblingEnvelopes);
            }
        }

        private void cleanIdleEnvelopes() {
            long currentTime = System.currentTimeMillis();
            Iterator<BrokerEnvelopeKey> iter = this.envelopeMap.keySet().iterator();
            while (iter.hasNext()) {
                BrokerEnvelopeKey key = iter.next();
                if (currentTime - key.initTime <= this.ENVELOPE_IDLE_TIMEOUT) continue;
                iter.remove();
            }
        }
    }

    private static class BrokerEnvelopeKey {
        private final String id;
        private final long initTime;

        private BrokerEnvelopeKey(String id, long initTime) {
            this.id = id;
            this.initTime = initTime;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            BrokerEnvelopeKey that = (BrokerEnvelopeKey)o;
            return Objects.equals(this.id, that.id);
        }

        public int hashCode() {
            return Objects.hash(this.id);
        }
    }

    private class InFlightMessage {
        private final CompletableFuture completableFuture;
        private final ScheduledFuture scheduledFuture;

        private InFlightMessage(CompletableFuture completableFuture, ScheduledFuture scheduledFuture) {
            this.completableFuture = completableFuture;
            this.scheduledFuture = scheduledFuture;
        }
    }
}

