package io.rouz.scratch.persist;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.spotify.apollo.Client;
import com.spotify.apollo.Request;
import com.spotify.apollo.Response;
import com.spotify.apollo.Status;
import io.rouz.flo.Fn;
import io.rouz.flo.Task;
import io.rouz.flo.TaskContext;
import io.rouz.flo.TaskId;
import io.rouz.flo.context.AsyncContext;
import io.rouz.scratch.persist.LockHolder;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import okio.ByteString;

/* loaded from: input_file:io/rouz/scratch/persist/Dump.class */
public class Dump extends AsyncContext {
    private static final String LOCK_PATH = "/lock";
    private final ScheduledExecutorService scheduler;
    private final ExecutorService executor;
    private final ConcurrentMap<TaskId, EvalBundle<?>> ongoing;
    private final Client client;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/rouz/scratch/persist/Dump$EvalBundle.class */
    public final class EvalBundle<T> {
        private final Task<T> task;
        private final TaskContext.Promise<T> promise;
        private final Optional<Function<ByteString, T>> decoder;

        private EvalBundle(Task<T> task, TaskContext.Promise<T> promise, Optional<Function<ByteString, T>> optional) {
            this.task = task;
            this.promise = promise;
            this.decoder = optional;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void fetchOrElse(TaskContext.Promise<T> promise, Runnable runnable) {
            Preconditions.checkState(this.decoder.isPresent(), "Must have decoder when fetching existing");
            Dump.this.client.send(Dump.this.request("GET").withPayload(Dump.json(this.task.id()))).handleAsync(parseExisting(this.decoder.get()), Dump.this.executor).thenAcceptAsync(consumer(promise, runnable), Dump.this.executor);
        }

        private Consumer<TaskContext.Value<T>> consumer(TaskContext.Promise<T> promise, Runnable runnable) {
            return value -> {
                value.consume(obj -> {
                    promise.set(obj);
                });
                value.onFail(th -> {
                    runnable.run();
                });
            };
        }

        private BiFunction<Response<ByteString>, Throwable, TaskContext.Value<T>> parseExisting(Function<ByteString, T> function) {
            return (response, th) -> {
                TaskContext.Promise promise = Dump.this.promise();
                if (th != null) {
                    promise.fail(th);
                } else if (response.status().code() != Status.OK.code()) {
                    promise.fail(new RuntimeException("Not OK"));
                } else if (response.payload().isPresent()) {
                    promise.set(function.apply((ByteString) response.payload().get()));
                } else {
                    promise.fail(new RuntimeException("Response with no body returned for persisted value from flock"));
                }
                return promise.value();
            };
        }
    }

    /* loaded from: input_file:io/rouz/scratch/persist/Dump$ProcessBundle.class */
    private final class ProcessBundle<T> {
        private final EvalBundle<T> evalBundle;
        private final LockHolder lockHolder;
        private final Fn<TaskContext.Value<T>> processFn;
        private final TaskContext.Promise<T> promise;
        private final Semaphore notified = new Semaphore(1);

        public ProcessBundle(EvalBundle<T> evalBundle, LockHolder lockHolder, Fn<TaskContext.Value<T>> fn, TaskContext.Promise<T> promise) {
            this.evalBundle = evalBundle;
            this.lockHolder = lockHolder;
            this.processFn = fn;
            this.promise = promise;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void process() {
            Consumer<Throwable> onExecutor = Dump.this.onExecutor(th -> {
                ((EvalBundle) this.evalBundle).task.id();
                if (th == null) {
                    invoke();
                } else if (!(th instanceof LockHolder.AlreadyLocked)) {
                    this.promise.fail(th);
                } else {
                    if (this.notified.tryAcquire()) {
                    }
                    this.evalBundle.fetchOrElse(this.promise, () -> {
                        Dump.this.scheduler.schedule(Dump.this.onExecutor(() -> {
                            process();
                        }), 1L, TimeUnit.SECONDS);
                    });
                }
            });
            if (shouldLock()) {
                this.lockHolder.lock(onExecutor);
            } else {
                invoke();
            }
        }

        private void invoke() {
            ((EvalBundle) this.evalBundle).task.id();
            TaskContext.Value value = (TaskContext.Value) this.processFn.get();
            value.consume(obj -> {
                complete(obj, () -> {
                    this.promise.set(obj);
                }, th -> {
                    this.promise.fail(th);
                });
            });
            value.onFail(th -> {
                fail(() -> {
                    this.promise.fail(th);
                });
            });
        }

        private void complete(T t, Runnable runnable, Consumer<Throwable> consumer) {
            if (!shouldLock()) {
                runnable.run();
                return;
            }
            try {
                Optional map = Dump.findEncoder(((EvalBundle) this.evalBundle).task.type()).map(function -> {
                    return (ByteString) function.apply(t);
                });
                if (map.isPresent()) {
                    this.lockHolder.unlock((ByteString) map.get(), Dump.this.onExecutor(runnable));
                } else {
                    this.lockHolder.unlock(Dump.this.onExecutor(runnable));
                }
            } catch (Throwable th) {
                this.lockHolder.unlock(() -> {
                    consumer.accept(th);
                });
            }
        }

        private void fail(Runnable runnable) {
            if (shouldLock()) {
                this.lockHolder.unlock(Dump.this.onExecutor(runnable));
            } else {
                runnable.run();
            }
        }

        private boolean shouldLock() {
            return ((EvalBundle) this.evalBundle).decoder.isPresent();
        }
    }

    protected Dump(ScheduledExecutorService scheduledExecutorService, ExecutorService executorService) {
        super(executorService);
        this.ongoing = Maps.newConcurrentMap();
        this.client = request -> {
            return CompletableFuture.completedFuture(null);
        };
        this.scheduler = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService);
        this.executor = (ExecutorService) Objects.requireNonNull(executorService);
    }

    public <T> TaskContext.Value<T> invokeProcessFn(TaskId taskId, Fn<TaskContext.Value<T>> fn) {
        EvalBundle<T> lookupBundle = lookupBundle(taskId);
        TaskContext.Promise promise = promise();
        new ProcessBundle(lookupBundle, new LockHolder(taskId, "/", this.scheduler), fn, promise).process();
        return promise.value();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Runnable onExecutor(Runnable runnable) {
        return () -> {
            this.executor.submit(runnable);
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <T> Consumer<T> onExecutor(Consumer<T> consumer) {
        return obj -> {
            this.executor.submit(() -> {
                consumer.accept(obj);
            });
        };
    }

    private <T> void chain(TaskContext.Value<T> value, TaskContext.Promise<T> promise) {
        promise.getClass();
        value.consume(promise::set);
        promise.getClass();
        value.onFail(promise::fail);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> Optional<Function<T, ByteString>> findEncoder(Class<T> cls) {
        for (Method method : cls.getDeclaredMethods()) {
            if (method.getDeclaredAnnotation(Encoder.class) != null) {
                return Optional.of(obj -> {
                    return (ByteString) invokeAndPropagateException(method, obj);
                });
            }
        }
        return Optional.empty();
    }

    private static Object invokeAndPropagateException(Method method, Object... objArr) {
        try {
            return method.invoke(null, objArr);
        } catch (IllegalAccessException | InvocationTargetException e) {
            throw Throwables.propagate(e);
        }
    }

    private <T> EvalBundle<T> lookupBundle(TaskId taskId) {
        EvalBundle<T> evalBundle;
        do {
            evalBundle = (EvalBundle) this.ongoing.get(taskId);
        } while (evalBundle == null);
        return evalBundle;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Request request(String str) {
        return Request.forUri("//lock", str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ByteString json(TaskId taskId) {
        return ByteString.encodeUtf8("{\"task_id\":\"" + taskId.toString() + "\"}");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ByteString json(TaskId taskId, ByteString byteString) {
        return ByteString.encodeUtf8("{\"task_id\":\"" + taskId.toString() + "\", \"data\":\"" + byteString.base64() + "\"}");
    }
}
