/*
 * Decompiled with CFR 0.152.
 */
package io.opensergo;

import com.google.protobuf.Any;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.opensergo.ConfigKindMetadata;
import io.opensergo.OpenSergoConfigKindRegistry;
import io.opensergo.log.OpenSergoLogger;
import io.opensergo.proto.transport.v1.DataWithVersion;
import io.opensergo.proto.transport.v1.Status;
import io.opensergo.proto.transport.v1.SubscribeRequest;
import io.opensergo.proto.transport.v1.SubscribeResponse;
import io.opensergo.subscribe.LocalDataNotifyResult;
import io.opensergo.subscribe.OpenSergoConfigSubscriber;
import io.opensergo.subscribe.SubscribeKey;
import io.opensergo.subscribe.SubscribeRegistry;
import io.opensergo.subscribe.SubscribedConfigCache;
import io.opensergo.subscribe.SubscribedData;
import io.opensergo.util.StringUtils;
import java.util.ArrayList;
import java.util.List;

public class OpenSergoSubscribeClientObserver
implements ClientResponseObserver<SubscribeRequest, SubscribeResponse> {
    private ClientCallStreamObserver<SubscribeRequest> requestStream;
    private final SubscribedConfigCache configCache;
    private final SubscribeRegistry subscribeRegistry;

    public OpenSergoSubscribeClientObserver(SubscribedConfigCache configCache, SubscribeRegistry subscribeRegistry) {
        this.configCache = configCache;
        this.subscribeRegistry = subscribeRegistry;
    }

    public void beforeStart(ClientCallStreamObserver<SubscribeRequest> requestStream) {
        this.requestStream = requestStream;
    }

    private LocalDataNotifyResult notifyDataChange(SubscribeKey subscribeKey, DataWithVersion dataWithVersion) throws Exception {
        long receivedVersion = dataWithVersion.getVersion();
        SubscribedData cachedData = this.configCache.getDataFor(subscribeKey);
        if (cachedData != null && cachedData.getVersion() > receivedVersion) {
            return new LocalDataNotifyResult().setCode(4010);
        }
        List<Object> dataList = this.decodeActualData(subscribeKey.getKind().getKindName(), dataWithVersion.getDataList());
        this.configCache.updateData(subscribeKey, dataList, receivedVersion);
        List<OpenSergoConfigSubscriber> subscribers = this.subscribeRegistry.getSubscribersOf(subscribeKey);
        if (subscribers == null || subscribers.isEmpty()) {
            return LocalDataNotifyResult.withSuccess(dataList);
        }
        ArrayList<Throwable> notifyErrors = new ArrayList<Throwable>();
        for (OpenSergoConfigSubscriber subscriber : subscribers) {
            try {
                subscriber.onConfigUpdate(subscribeKey, dataList);
            }
            catch (Throwable t) {
                OpenSergoLogger.error("Failed to notify OpenSergo config change event, subscribeKey={}, subscriber={}", subscribeKey, subscriber);
                notifyErrors.add(t);
            }
        }
        if (notifyErrors.isEmpty()) {
            return LocalDataNotifyResult.withSuccess(dataList);
        }
        return new LocalDataNotifyResult().setCode(4007).setDecodedData(dataList).setNotifyErrors(notifyErrors);
    }

    public void onNext(SubscribeResponse pushCommand) {
        if (!StringUtils.isEmpty(pushCommand.getAck())) {
            int code = pushCommand.getStatus().getCode();
            if (code == 1) {
                return;
            }
            if (code >= 4000 && code < 4100) {
                OpenSergoLogger.warn("Warn: req failed, command={}", pushCommand);
                return;
            }
        }
        String kindName = pushCommand.getKind();
        try {
            Status status;
            ConfigKindMetadata kindMetadata = OpenSergoConfigKindRegistry.getKindMetadata(kindName);
            if (kindMetadata == null) {
                throw new IllegalArgumentException("unrecognized config kind: " + kindName);
            }
            SubscribeKey subscribeKey = new SubscribeKey(pushCommand.getNamespace(), pushCommand.getApp(), kindMetadata.getKind());
            LocalDataNotifyResult localResult = this.notifyDataChange(subscribeKey, pushCommand.getDataWithVersion());
            switch (localResult.getCode()) {
                case 1: {
                    status = Status.newBuilder().setCode(1).build();
                    break;
                }
                case 4007: {
                    StringBuilder message = new StringBuilder();
                    for (Throwable t : localResult.getNotifyErrors()) {
                        message.append(t.toString()).append('|');
                    }
                    status = Status.newBuilder().setMessage(message.toString()).setCode(1).build();
                    break;
                }
                case 4010: {
                    status = Status.newBuilder().setCode(4010).setMessage("outdated version").build();
                    break;
                }
                default: {
                    status = Status.newBuilder().setCode(localResult.getCode()).build();
                }
            }
            SubscribeRequest pushAckResponse = SubscribeRequest.newBuilder().setStatus(status).setResponseAck("ACK").setRequestId(pushCommand.getResponseId()).build();
            this.requestStream.onNext((Object)pushAckResponse);
        }
        catch (Exception ex) {
            OpenSergoLogger.error("Handle push command failed", ex);
            SubscribeRequest pushNackResponse = SubscribeRequest.newBuilder().setStatus(Status.newBuilder().setCode(4000).setMessage(ex.toString()).build()).setResponseAck("NACK").build();
            this.requestStream.onNext((Object)pushNackResponse);
        }
    }

    private List<Object> decodeActualData(String kind, List<Any> rawList) throws Exception {
        ConfigKindMetadata kindMetadata = OpenSergoConfigKindRegistry.getKindMetadata(kind);
        if (kindMetadata == null) {
            throw new IllegalArgumentException("unrecognized config kind: " + kind);
        }
        ArrayList<Object> list = new ArrayList<Object>();
        for (Any e : rawList) {
            list.add(e.unpack(kindMetadata.getKindClass()));
        }
        return list;
    }

    public void onError(Throwable t) {
        OpenSergoLogger.error("Fatal error occurred on OpenSergo gRPC ClientObserver", t);
    }

    public void onCompleted() {
        OpenSergoLogger.info("OpenSergoSubscribeClientObserver onCompleted", new Object[0]);
    }
}

