/*
 * Decompiled with CFR 0.152.
 */
package net.finmath.smartcontract.valuation.marketdata.generators.legacy;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.neovisionaries.ws.client.WebSocket;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.subjects.PublishSubject;
import java.io.File;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalDouble;
import java.util.Set;
import java.util.stream.Stream;
import net.finmath.smartcontract.model.MarketDataSet;
import net.finmath.smartcontract.model.MarketDataSetValuesInner;
import net.finmath.smartcontract.model.RefinitivMarketData;
import net.finmath.smartcontract.model.RefinitivMarketDataFields;
import net.finmath.smartcontract.model.RefinitivMarketDataKey;
import net.finmath.smartcontract.valuation.marketdata.curvecalibration.CalibrationDataItem;
import net.finmath.smartcontract.valuation.marketdata.generators.legacy.LiveFeedAdapter;
import net.finmath.time.businessdaycalendar.BusinessdayCalendarExcludingTARGETHolidays;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Sinks;

public class ReactiveMarketDataUpdater
extends LiveFeedAdapter<MarketDataSet> {
    private static final Logger logger = LoggerFactory.getLogger(ReactiveMarketDataUpdater.class);
    private final JSONObject authJson;
    private final String position;
    private final Set<CalibrationDataItem.Spec> calibrationSpecs;
    private final PublishSubject<MarketDataSet> publishSubject;
    private final Sinks.Many<MarketDataSet> sink;
    private final ObjectMapper mapper = new ObjectMapper();
    boolean requestSent;
    private MarketDataSet marketDataSet;

    public ReactiveMarketDataUpdater(JSONObject authJson, String position, List<CalibrationDataItem.Spec> itemList) {
        this.mapper.registerModule((Module)new JavaTimeModule());
        this.mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).configure(SerializationFeature.INDENT_OUTPUT, true).configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
        this.marketDataSet = new MarketDataSet();
        this.calibrationSpecs = new LinkedHashSet<CalibrationDataItem.Spec>(itemList);
        this.authJson = authJson;
        this.position = position;
        this.requestSent = false;
        this.publishSubject = PublishSubject.create();
        this.sink = Sinks.many().multicast().onBackpressureBuffer();
    }

    private boolean allQuotesRetrieved() {
        return this.marketDataSet.getValues().size() >= this.calibrationSpecs.size();
    }

    private void reset() {
        this.marketDataSet = new MarketDataSet();
    }

    public void onConnected(WebSocket websocket, Map<String, List<String>> headers) throws Exception {
        this.sendLoginRequest(websocket, this.authJson.getString("access_token"), true);
        logger.info("WebSocket successfully connected! Resetting connection.");
        this.closeStreamsAndLogoff(websocket);
        logger.info("Connection reset. Reopening connection...");
        this.sendLoginRequest(websocket, this.authJson.getString("access_token"), true);
        logger.info("...done");
    }

    @Override
    public Observable<MarketDataSet> asObservable() {
        return this.publishSubject;
    }

    public void onTextMessage(WebSocket websocket, String message) {
        if (!message.isEmpty()) {
            try {
                List marketDataValues = (List)this.mapper.readerForListOf(RefinitivMarketData.class).readValue(message);
                this.marketDataSet.requestTimestamp(OffsetDateTime.now(ZoneId.of("GMT")).withNano(0));
                for (RefinitivMarketData mdi : marketDataValues) {
                    RefinitivMarketDataKey refinitivMarketDataKey = Objects.requireNonNull(mdi.getKey());
                    String symbol = refinitivMarketDataKey.getName();
                    RefinitivMarketDataFields refinitivMarketDataFields = Objects.requireNonNull(mdi.getFields());
                    OffsetDateTime dataTimestamp = OffsetDateTime.parse(String.valueOf(refinitivMarketDataFields.getVALUEDT1()) + "T" + refinitivMarketDataFields.getVALUETS1() + "Z");
                    OptionalDouble optValue = Stream.of(refinitivMarketDataFields.getASK(), refinitivMarketDataFields.getBID()).filter(Objects::nonNull).mapToDouble(x -> x).average();
                    if (optValue.isEmpty()) {
                        throw new IllegalStateException("Failed to get average");
                    }
                    boolean hasSymbol = false;
                    for (MarketDataSetValuesInner i : this.marketDataSet.getValues()) {
                        hasSymbol |= i.getSymbol().equals(refinitivMarketDataKey.getName());
                    }
                    if (hasSymbol) continue;
                    this.marketDataSet.addValuesItem(new MarketDataSetValuesInner().value(optValue.getAsDouble()).dataTimestamp(dataTimestamp).symbol(symbol));
                }
            }
            catch (JsonProcessingException | IllegalStateException | NullPointerException e) {
                logger.info("JSON mapper is failing silently in order to skip message:{}{}{}as it is not a quote/fixing update.", new Object[]{System.lineSeparator(), message, System.lineSeparator()});
            }
            if (!this.requestSent) {
                this.sendRICRequest(websocket);
                this.requestSent = true;
            }
        }
        if (this.allQuotesRetrieved()) {
            this.publishSubject.onNext((Object)this.marketDataSet);
            this.sink.tryEmitNext((Object)this.marketDataSet);
            this.reset();
            this.requestSent = false;
        }
    }

    private void sendRICRequest(WebSocket websocket) {
        String keyString1 = this.ricsToString();
        String requestJsonString = "{\"ID\":2," + keyString1 + ",\"View\":[\"MID\",\"BID\",\"ASK\",\"VALUE_DT1\",\"VALUE_TS1\"]}";
        websocket.sendText(requestJsonString);
    }

    @Override
    public void closeStreamsAndLogoff(WebSocket webSocket) {
        String request = "{\"ID\":1, \"Type\": \"Close\", \"Domain\":\"Login\"}";
        webSocket.sendText(request);
    }

    private MarketDataSetValuesInner overnightFixingPostProcessing(MarketDataSetValuesInner datapoint, boolean isOvernightFixing) {
        if (datapoint.getSymbol().equals("EUROSTR=") && isOvernightFixing) {
            BusinessdayCalendarExcludingTARGETHolidays bdCalendar = new BusinessdayCalendarExcludingTARGETHolidays();
            LocalDateTime rolledDate = bdCalendar.getRolledDate(datapoint.getDataTimestamp().toLocalDate(), 1).atTime(datapoint.getDataTimestamp().toLocalTime());
            System.out.println(rolledDate);
            OffsetDateTime dataTimestamp = OffsetDateTime.parse(String.valueOf(rolledDate) + "Z");
            return new MarketDataSetValuesInner().symbol("EUROSTR=").value(datapoint.getValue()).dataTimestamp(dataTimestamp);
        }
        return datapoint;
    }

    @Override
    public void writeDataset(String importFile, MarketDataSet transferMessage, boolean isOvernightFixing) throws IOException {
        transferMessage.values(transferMessage.getValues().stream().map(x -> this.overnightFixingPostProcessing((MarketDataSetValuesInner)x, isOvernightFixing)).toList());
        File targetFile = new File(importFile);
        this.mapper.writerFor(MarketDataSet.class).writeValue(targetFile, (Object)transferMessage);
    }

    private void sendLoginRequest(WebSocket websocket, String authToken, boolean isFirstLogin) throws Exception {
        String loginJsonString = "{\"ID\":1,\"Domain\":\"Login\",\"Key\":{\"Elements\":{\"ApplicationId\":\"\",\"Position\":\"\",\"AuthenticationToken\":\"\"},\"NameType\":\"AuthnToken\"}}";
        ObjectMapper mapper = new ObjectMapper();
        ObjectNode loginJson = (ObjectNode)mapper.readTree(loginJsonString);
        ((ObjectNode)loginJson.get("Key").get("Elements")).put("AuthenticationToken", authToken);
        ((ObjectNode)loginJson.get("Key").get("Elements")).put("ApplicationId", "256");
        ((ObjectNode)loginJson.get("Key").get("Elements")).put("Position", this.position);
        if (!isFirstLogin) {
            loginJson.put("Refresh", false);
        }
        websocket.sendText(loginJson.toString());
    }

    private String ricsToString() {
        StringBuilder ricsAsString = new StringBuilder("\"Key\":{\"Name\":[");
        for (CalibrationDataItem.Spec item : this.calibrationSpecs) {
            ricsAsString.append("\"").append(item.getKey()).append("\",");
        }
        ricsAsString = new StringBuilder(ricsAsString.substring(0, ricsAsString.length() - 1));
        ricsAsString.append("]}");
        return ricsAsString.toString();
    }
}

