package org.reaktivity.reaktor.internal;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.MessageHandler;
import org.agrona.concurrent.broadcast.BroadcastReceiver;
import org.agrona.concurrent.broadcast.CopyBroadcastReceiver;
import org.agrona.concurrent.ringbuffer.ManyToOneRingBuffer;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.reaktivity.nukleus.Controller;
import org.reaktivity.nukleus.ControllerBuilder;
import org.reaktivity.nukleus.ControllerSpi;
import org.reaktivity.nukleus.function.MessageFunction;
import org.reaktivity.reaktor.internal.layouts.ControlLayout;
import org.reaktivity.reaktor.internal.types.control.CommandFW;
import org.reaktivity.reaktor.internal.types.control.ResponseFW;
import org.reaktivity.reaktor.internal.types.control.RoutedFW;
import org.reaktivity.reaktor.internal.types.control.auth.ResolvedFW;

/* loaded from: input_file:org/reaktivity/reaktor/internal/ControllerBuilderImpl.class */
public final class ControllerBuilderImpl<T extends Controller> implements ControllerBuilder<T> {
    private final ReaktorConfiguration config;
    private final Class<T> kind;
    private Function<ControllerSpi, T> factory;

    /* loaded from: input_file:org/reaktivity/reaktor/internal/ControllerBuilderImpl$ControllerSpiImpl.class */
    private final class ControllerSpiImpl implements ControllerSpi {
        private final ControlLayout.Builder controlRW;
        private final CommandFW commandRO;
        private final ResponseFW responseRO;
        private final RoutedFW routedRO;
        private final ResolvedFW resolvedRO;
        private final RingBuffer conductorCommands;
        private final CopyBroadcastReceiver conductorResponses;
        private final ConcurrentMap<Long, PendingCommand<?>> commandsByCorrelationId;
        private final MessageHandler responseHandler;
        private final ControlLayout control;
        static final /* synthetic */ boolean $assertionsDisabled;

        private ControllerSpiImpl(ReaktorConfiguration reaktorConfiguration) {
            this.controlRW = new ControlLayout.Builder();
            this.commandRO = new CommandFW();
            this.responseRO = new ResponseFW();
            this.routedRO = new RoutedFW();
            this.resolvedRO = new ResolvedFW();
            this.control = this.controlRW.controlPath(reaktorConfiguration.directory().resolve("control")).commandBufferCapacity(reaktorConfiguration.commandBufferCapacity()).responseBufferCapacity(reaktorConfiguration.responseBufferCapacity()).readonly(true).build();
            this.conductorCommands = new ManyToOneRingBuffer(this.control.commandBuffer());
            this.conductorResponses = new CopyBroadcastReceiver(new BroadcastReceiver(this.control.responseBuffer()));
            this.commandsByCorrelationId = new ConcurrentHashMap();
            this.responseHandler = (v1, v2, v3, v4) -> {
                handleResponse(v1, v2, v3, v4);
            };
        }

        @Override // org.reaktivity.nukleus.ControllerSpi
        public long nextCorrelationId() {
            return this.conductorCommands.nextCorrelationId();
        }

        @Override // org.reaktivity.nukleus.ControllerSpi
        public int doProcess() {
            return this.conductorResponses.receive(this.responseHandler);
        }

        @Override // org.reaktivity.nukleus.ControllerSpi
        public void doClose() {
            CloseHelper.quietClose(this.control);
        }

        @Override // org.reaktivity.nukleus.ControllerSpi
        public CompletableFuture<Long> doResolve(int i, DirectBuffer directBuffer, int i2, int i3) {
            if ($assertionsDisabled || i == 17) {
                return doCommand(i, directBuffer, i2, i3, (i4, directBuffer2, i5, i6) -> {
                    return Long.valueOf(this.resolvedRO.wrap(directBuffer2, i5, i5 + i6).authorization());
                });
            }
            throw new AssertionError();
        }

        @Override // org.reaktivity.nukleus.ControllerSpi
        public CompletableFuture<Long> doRoute(int i, DirectBuffer directBuffer, int i2, int i3) {
            if ($assertionsDisabled || i == 1) {
                return doCommand(i, directBuffer, i2, i3, (i4, directBuffer2, i5, i6) -> {
                    return Long.valueOf(this.routedRO.wrap(directBuffer2, i5, i5 + i6).correlationId());
                });
            }
            throw new AssertionError();
        }

        @Override // org.reaktivity.nukleus.ControllerSpi
        public CompletableFuture<Void> doUnresolve(int i, DirectBuffer directBuffer, int i2, int i3) {
            if ($assertionsDisabled || i == 18) {
                return doCommand(i, directBuffer, i2, i3);
            }
            throw new AssertionError();
        }

        @Override // org.reaktivity.nukleus.ControllerSpi
        public CompletableFuture<Void> doUnroute(int i, DirectBuffer directBuffer, int i2, int i3) {
            if ($assertionsDisabled || i == 2) {
                return doCommand(i, directBuffer, i2, i3);
            }
            throw new AssertionError();
        }

        @Override // org.reaktivity.nukleus.ControllerSpi
        public CompletableFuture<Void> doFreeze(int i, DirectBuffer directBuffer, int i2, int i3) {
            if ($assertionsDisabled || i == 3) {
                return doCommand(i, directBuffer, i2, i3);
            }
            throw new AssertionError();
        }

        @Override // org.reaktivity.nukleus.ControllerSpi
        public CompletableFuture<Void> doCommand(int i, DirectBuffer directBuffer, int i2, int i3) {
            return doCommand(i, directBuffer, i2, i3, (i4, directBuffer2, i5, i6) -> {
                return null;
            });
        }

        @Override // org.reaktivity.nukleus.ControllerSpi
        public <R> CompletableFuture<R> doCommand(int i, DirectBuffer directBuffer, int i2, int i3, MessageFunction<R> messageFunction) {
            CompletableFuture<R> completableFuture = new CompletableFuture<>();
            long correlationId = this.commandRO.wrap(directBuffer, i2, i2 + i3).correlationId();
            commandSent(correlationId, new PendingCommand<>(messageFunction, completableFuture));
            if (!this.conductorCommands.write(i, directBuffer, i2, i3)) {
                commandSendFailed(correlationId);
            }
            return completableFuture;
        }

        private int handleResponse(int i, DirectBuffer directBuffer, int i2, int i3) {
            PendingCommand<?> remove = this.commandsByCorrelationId.remove(Long.valueOf(this.responseRO.wrap(directBuffer, i2, i3).correlationId()));
            switch (i) {
                case 1073741824:
                    commandFailed(remove, "command failed");
                    return 1;
                default:
                    commandSucceeded(remove, i, directBuffer, i2, i3);
                    return 1;
            }
        }

        private <R> void commandSent(long j, PendingCommand<R> pendingCommand) {
            this.commandsByCorrelationId.put(Long.valueOf(j), pendingCommand);
        }

        private <R> boolean commandSucceeded(PendingCommand<R> pendingCommand, int i, DirectBuffer directBuffer, int i2, int i3) {
            return pendingCommand != null && pendingCommand.succeeded(i, directBuffer, i2, i3);
        }

        private boolean commandSendFailed(long j) {
            return commandFailed(this.commandsByCorrelationId.remove(Long.valueOf(j)), "unable to offer command");
        }

        private boolean commandFailed(PendingCommand<?> pendingCommand, String str) {
            return pendingCommand != null && pendingCommand.failed(str);
        }

        static {
            $assertionsDisabled = !ControllerBuilderImpl.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/reaktivity/reaktor/internal/ControllerBuilderImpl$PendingCommand.class */
    public static final class PendingCommand<R> {
        final MessageFunction<R> mapper;
        final CompletableFuture<R> promise;

        private PendingCommand(MessageFunction<R> messageFunction, CompletableFuture<R> completableFuture) {
            this.mapper = messageFunction;
            this.promise = completableFuture;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean succeeded(int i, DirectBuffer directBuffer, int i2, int i3) {
            return this.promise.complete(this.mapper.apply(i, directBuffer, i2, i3));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean failed(String str) {
            return this.promise.completeExceptionally(new IllegalStateException(str));
        }
    }

    public ControllerBuilderImpl(ReaktorConfiguration reaktorConfiguration, Class<T> cls) {
        this.config = reaktorConfiguration;
        this.kind = cls;
    }

    @Override // org.reaktivity.nukleus.ControllerBuilder
    public Class<T> kind() {
        return this.kind;
    }

    @Override // org.reaktivity.nukleus.ControllerBuilder
    public ControllerBuilder<T> setFactory(Function<ControllerSpi, T> function) {
        this.factory = function;
        return this;
    }

    @Override // org.reaktivity.nukleus.ControllerBuilder
    public T build() {
        Objects.requireNonNull(this.factory, "factory");
        return this.factory.apply(new ControllerSpiImpl(this.config));
    }
}
