package net.soundvibe.reacto.discovery.couchbase;

import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.PersistTo;
import com.couchbase.client.java.ReplicateTo;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.error.CASMismatchException;
import com.couchbase.client.java.error.TemporaryFailureException;
import com.couchbase.client.java.error.TemporaryLockFailureException;
import com.couchbase.client.java.view.DefaultView;
import com.couchbase.client.java.view.DesignDocument;
import com.couchbase.client.java.view.Stale;
import com.couchbase.client.java.view.ViewQuery;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Timer;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import net.soundvibe.reacto.client.events.EventHandlerRegistry;
import net.soundvibe.reacto.discovery.AbstractServiceRegistry;
import net.soundvibe.reacto.discovery.types.ServiceRecord;
import net.soundvibe.reacto.discovery.types.ServiceType;
import net.soundvibe.reacto.discovery.types.Status;
import net.soundvibe.reacto.mappers.ServiceRegistryMapper;
import net.soundvibe.reacto.types.Any;
import net.soundvibe.reacto.types.Command;
import net.soundvibe.reacto.types.json.JsonObject;
import net.soundvibe.reacto.utils.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;

/* loaded from: input_file:net/soundvibe/reacto/discovery/couchbase/CouchbaseServiceRegistry.class */
public final class CouchbaseServiceRegistry extends AbstractServiceRegistry {
    private static final Logger log = LoggerFactory.getLogger(CouchbaseServiceRegistry.class);
    public static final ViewQuery DEFAULT_VIEW_QUERY = ViewQuery.from("reacto", "services");
    public static final ServiceRecord DEFAULT_SERVICE_RECORD = ServiceRecord.create("UNKNOWN", Status.UNKNOWN, ServiceType.LOCAL, UUID.randomUUID().toString(), JsonObject.empty(), JsonObject.empty());
    public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_SECONDS = 60;
    private final Supplier<Bucket> bucketSupplier;
    private final ViewQuery viewQuery;
    private final ServiceRecord serviceRecord;
    private final AtomicBoolean isOpen;
    private final AtomicReference<Timer> timer;
    private final com.couchbase.client.java.document.json.JsonObject serviceObject;
    private final int heartBeatIntervalInSeconds;
    private static final String viewMapFunction = "function (doc, meta) {\n  if (doc.status && doc.objectType) {\n    if (doc.objectType === \"reacto-service-registry\" && doc.status === \"UP\") {\n      emit(doc.registration, null);\n    }\n  }\n}";

    public CouchbaseServiceRegistry(Supplier<Bucket> supplier, EventHandlerRegistry eventHandlerRegistry, ServiceRegistryMapper serviceRegistryMapper, ServiceRecord serviceRecord) {
        this(supplier, DEFAULT_VIEW_QUERY, eventHandlerRegistry, serviceRegistryMapper, serviceRecord, 60);
    }

    public CouchbaseServiceRegistry(Supplier<Bucket> supplier, ViewQuery viewQuery, EventHandlerRegistry eventHandlerRegistry, ServiceRegistryMapper serviceRegistryMapper, ServiceRecord serviceRecord, int i) {
        super(eventHandlerRegistry, serviceRegistryMapper);
        this.isOpen = new AtomicBoolean(false);
        this.timer = new AtomicReference<>();
        Objects.requireNonNull(supplier, "bucketSupplier cannot be null");
        Objects.requireNonNull(viewQuery, "viewQuery cannot be null");
        Objects.requireNonNull(serviceRecord, "serviceRecord cannot be null");
        this.bucketSupplier = supplier;
        this.viewQuery = viewQuery;
        this.serviceRecord = serviceRecord;
        this.serviceObject = toCouchbaseObject(serviceRecord);
        this.heartBeatIntervalInSeconds = i;
    }

    public static com.couchbase.client.java.document.json.JsonObject toCouchbaseObject(ServiceRecord serviceRecord) {
        return com.couchbase.client.java.document.json.JsonObject.fromJson(serviceRecord.toJson());
    }

    public Observable<DesignDocument> updateDefaultView(String str, String str2) {
        return this.bucketSupplier.get().bucketManager().async().getDesignDocument(str).doOnNext(designDocument -> {
            designDocument.views().replaceAll(view -> {
                return str2.equals(view.name()) ? DefaultView.create(view.name(), viewMapFunction) : view;
            });
        }).flatMap(designDocument2 -> {
            return this.bucketSupplier.get().bucketManager().async().upsertDesignDocument(designDocument2);
        }).switchIfEmpty(Observable.defer(() -> {
            return this.bucketSupplier.get().bucketManager().async().insertDesignDocument(DesignDocument.create(str, Collections.singletonList(DefaultView.create(str2, viewMapFunction))));
        })).flatMap(designDocument3 -> {
            return this.bucketSupplier.get().bucketManager().async().publishDesignDocument(str, true);
        });
    }

    public Observable<ServiceRecord> findRecords() {
        return this.bucketSupplier.get().async().query(this.viewQuery).retry(CouchbaseServiceRegistry::RETRY_DEFAULT).flatMap((v0) -> {
            return v0.rows();
        }).flatMap((v0) -> {
            return v0.document();
        }).map((v0) -> {
            return v0.content();
        }).map(CouchbaseServiceRegistry::toRecord);
    }

    protected Observable<List<ServiceRecord>> findRecordsOf(Command command) {
        return findRecords().filter(serviceRecord -> {
            return Boolean.valueOf(serviceRecord.isCompatibleWith(command));
        }).toList().defaultIfEmpty(Collections.emptyList());
    }

    private static Boolean RETRY_DEFAULT(Integer num, Throwable th) {
        return Boolean.valueOf(num.intValue() < 10 && ((th instanceof TemporaryFailureException) || (th instanceof TemporaryLockFailureException) || (th instanceof CASMismatchException)));
    }

    public static ServiceRecord toRecord(com.couchbase.client.java.document.json.JsonObject jsonObject) {
        return ServiceRecord.fromJson(jsonObject.toString());
    }

    public Observable<Any> unpublish(ServiceRecord serviceRecord) {
        return this.bucketSupplier.get().async().remove(serviceRecord.registrationId, PersistTo.NONE, ReplicateTo.NONE).flatMap(jsonDocument -> {
            return this.bucketSupplier.get().async().query(ViewQuery.from(this.viewQuery.getDesign(), this.viewQuery.getView()).stale(Stale.FALSE).limit(1)).map((v0) -> {
                return v0.success();
            });
        }).retry(CouchbaseServiceRegistry::RETRY_DEFAULT).filter(bool -> {
            return bool;
        }).map(bool2 -> {
            return Any.VOID;
        }).switchIfEmpty(Observable.error(new IllegalStateException("Service record was not unpublished")));
    }

    public Observable<Any> publish() {
        return this.bucketSupplier.get().async().upsert(JsonDocument.create(this.serviceRecord.registrationId, ttl(), this.serviceObject), PersistTo.NONE, ReplicateTo.NONE).flatMap(jsonDocument -> {
            return this.bucketSupplier.get().async().query(ViewQuery.from(this.viewQuery.getDesign(), this.viewQuery.getView()).stale(Stale.FALSE).limit(1)).map((v0) -> {
                return v0.success();
            });
        }).retry(CouchbaseServiceRegistry::RETRY_DEFAULT).filter(bool -> {
            return bool;
        }).map(bool2 -> {
            return Any.VOID;
        }).switchIfEmpty(Observable.error(new IllegalStateException("Service record was not published")));
    }

    public Observable<Any> update() {
        return this.bucketSupplier.get().async().touch(this.serviceRecord.registrationId, ttl()).retry(CouchbaseServiceRegistry::RETRY_DEFAULT).filter(bool -> {
            return bool;
        }).map(bool2 -> {
            return Any.VOID;
        }).switchIfEmpty(Observable.error(new IllegalStateException("Service record was not updated")));
    }

    private int ttl() {
        return (int) (this.heartBeatIntervalInSeconds * 1.5d);
    }

    private void startHeartBeat() {
        this.timer.set(Scheduler.scheduleAtFixedInterval(TimeUnit.SECONDS.toMillis(this.heartBeatIntervalInSeconds), () -> {
            Observable.just(this.serviceRecord).filter(serviceRecord -> {
                return Boolean.valueOf(this.isOpen.get());
            }).flatMap(serviceRecord2 -> {
                return update();
            }).subscribe(any -> {
                log.info("Service was updated successfully");
            }, th -> {
                log.error("Error when updating service registration: " + th, th);
            });
        }, "Couchbase service registry heartbeat"));
    }

    public Observable<Any> register() {
        return Observable.just(this.serviceRecord).filter(serviceRecord -> {
            return Boolean.valueOf(!serviceRecord.equals(DEFAULT_SERVICE_RECORD));
        }).filter(serviceRecord2 -> {
            return Boolean.valueOf(!this.isOpen.get());
        }).flatMap(serviceRecord3 -> {
            return publish();
        }).doOnNext(any -> {
            startHeartBeat();
        }).doOnNext(any2 -> {
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                log.info("Executing shutdown hook...");
                unregister().subscribe(any2 -> {
                    log.info("Service was successfully unregistered before shutting down");
                }, th -> {
                    log.error("Service was unable to unregister before shutting down: " + th);
                });
            }));
        }).doOnNext(any3 -> {
            this.isOpen.set(true);
        });
    }

    public Observable<Any> unregister() {
        return Observable.just(this.serviceRecord).filter(serviceRecord -> {
            return Boolean.valueOf(!serviceRecord.equals(DEFAULT_SERVICE_RECORD));
        }).filter(serviceRecord2 -> {
            return Boolean.valueOf(this.isOpen.get());
        }).doOnNext(serviceRecord3 -> {
            this.timer.get().cancel();
        }).flatMap(this::unpublish).doOnNext(any -> {
            this.isOpen.set(false);
        });
    }
}
