package io.camunda.zeebe.transport.stream.impl;

import io.atomix.cluster.MemberId;
import io.camunda.zeebe.transport.stream.impl.messages.AddStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.AddStreamResponse;
import io.camunda.zeebe.transport.stream.impl.messages.ErrorCode;
import io.camunda.zeebe.transport.stream.impl.messages.ErrorResponse;
import io.camunda.zeebe.transport.stream.impl.messages.RemoveStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.RemoveStreamResponse;
import io.camunda.zeebe.transport.stream.impl.messages.StreamResponse;
import io.camunda.zeebe.transport.stream.impl.messages.UUIDEncoder;
import io.camunda.zeebe.util.CloseableSilently;
import java.util.UUID;
import java.util.function.Function;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/zeebe/transport/stream/impl/RemoteStreamApiHandler.class */
public final class RemoteStreamApiHandler<M> implements CloseableSilently {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamApiHandler.class);
    private static final UUID NULL_ID = new UUID(UUIDEncoder.highNullValue(), UUIDEncoder.lowNullValue());
    private final AddStreamResponse addResponseOK = new AddStreamResponse();
    private final ErrorResponse errorResponse = new ErrorResponse();
    private final RemoveStreamResponse removeResponseOK = new RemoveStreamResponse();
    private final RemoteStreamRegistry<M> registry;
    private final Function<DirectBuffer, M> metadataFactory;

    public RemoteStreamApiHandler(RemoteStreamRegistry<M> remoteStreamRegistry, Function<DirectBuffer, M> function) {
        this.registry = remoteStreamRegistry;
        this.metadataFactory = function;
    }

    public void close() {
        this.registry.clear();
    }

    public StreamResponse add(MemberId memberId, AddStreamRequest addStreamRequest) {
        try {
            M apply = this.metadataFactory.apply(addStreamRequest.metadata());
            if (addStreamRequest.streamType().capacity() <= 0) {
                return failedResponse(memberId, "Expected a stream type of length > 0, but it has %d".formatted(Integer.valueOf(addStreamRequest.streamType().capacity())));
            }
            if (addStreamRequest.streamId() == null || addStreamRequest.streamId().equals(NULL_ID)) {
                return failedResponse(memberId, "Expected a stream ID, but received a nil UUID ([%s])".formatted(addStreamRequest.streamId()));
            }
            this.registry.add(new UnsafeBuffer(addStreamRequest.streamType()), addStreamRequest.streamId(), memberId, apply);
            LOG.debug("Opened stream {} from {}", addStreamRequest.streamId(), memberId);
            return this.addResponseOK;
        } catch (Exception e) {
            return failedResponse(memberId, "Failed to parse stream metadata (size = '%d') from AddStreamRequest".formatted(Integer.valueOf(addStreamRequest.metadata().capacity())), e);
        }
    }

    public StreamResponse remove(MemberId memberId, RemoveStreamRequest removeStreamRequest) {
        if (removeStreamRequest.streamId() == null || removeStreamRequest.streamId().equals(NULL_ID)) {
            return failedResponse(memberId, "Expected a stream ID, but received a nil UUID ([%s])".formatted(removeStreamRequest.streamId()));
        }
        this.registry.remove(removeStreamRequest.streamId(), memberId);
        LOG.debug("Removed stream {} from {}", removeStreamRequest.streamId(), memberId);
        return this.removeResponseOK;
    }

    public void removeAll(MemberId memberId) {
        this.registry.removeAll(memberId);
        LOG.debug("Removed all streams from {}", memberId);
    }

    private ErrorResponse failedResponse(MemberId memberId, String str, Exception exc) {
        LOG.warn("Failed to open stream for '{}': [{}]", new Object[]{memberId, str, exc});
        return this.errorResponse.code(ErrorCode.MALFORMED).message(str);
    }

    private ErrorResponse failedResponse(MemberId memberId, String str) {
        LOG.warn("Failed to open stream for '{}': [{}]", memberId, str);
        return this.errorResponse.code(ErrorCode.INVALID).message(str);
    }
}
