package org.eclipse.ditto.services.gateway.endpoints.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ReceiveTimeout;
import akka.actor.Status;
import akka.http.javadsl.model.ContentType;
import akka.http.javadsl.model.ContentTypes;
import akka.http.javadsl.model.HttpEntities;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.model.StatusCodes;
import akka.http.javadsl.model.Uri;
import akka.http.javadsl.model.headers.Location;
import akka.http.javadsl.model.headers.RawHeader;
import akka.http.scaladsl.model.ContentType$;
import akka.http.scaladsl.model.EntityStreamSizeException;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.AskTimeoutException;
import akka.util.ByteString;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonRuntimeException;
import org.eclipse.ditto.model.base.acks.AbstractCommandAckRequestSetter;
import org.eclipse.ditto.model.base.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.model.base.auth.AuthorizationContext;
import org.eclipse.ditto.model.base.auth.AuthorizationModelFactory;
import org.eclipse.ditto.model.base.common.HttpStatus;
import org.eclipse.ditto.model.base.exceptions.DittoJsonException;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
import org.eclipse.ditto.model.messages.Message;
import org.eclipse.ditto.protocoladapter.HeaderTranslator;
import org.eclipse.ditto.services.gateway.endpoints.routes.whoami.DefaultUserInformation;
import org.eclipse.ditto.services.gateway.endpoints.routes.whoami.Whoami;
import org.eclipse.ditto.services.gateway.endpoints.routes.whoami.WhoamiResponse;
import org.eclipse.ditto.services.gateway.util.config.endpoints.CommandConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.HttpConfig;
import org.eclipse.ditto.services.models.acks.AcknowledgementAggregatorActorStarter;
import org.eclipse.ditto.services.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.cluster.JsonValueSourceRef;
import org.eclipse.ditto.signals.acks.base.Acknowledgement;
import org.eclipse.ditto.signals.acks.base.Acknowledgements;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.base.WithOptionalEntity;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.base.ErrorResponse;
import org.eclipse.ditto.signals.commands.base.WithEntity;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayCommandTimeoutException;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayServiceUnavailableException;
import org.eclipse.ditto.signals.commands.devops.DevOpsCommand;
import org.eclipse.ditto.signals.commands.messages.MessageCommandResponse;
import org.eclipse.ditto.signals.commands.messages.acks.MessageCommandAckRequestSetter;
import org.eclipse.ditto.signals.commands.things.acks.ThingLiveCommandAckRequestSetter;
import org.eclipse.ditto.signals.commands.things.acks.ThingModifyCommandAckRequestSetter;

/* loaded from: input_file:org/eclipse/ditto/services/gateway/endpoints/actors/AbstractHttpRequestActor.class */
public abstract class AbstractHttpRequestActor extends AbstractActor {
    public static final String COMPLETE_MESSAGE = "complete";
    private static final ContentType CONTENT_TYPE_JSON = ContentTypes.APPLICATION_JSON;
    private static final ContentType CONTENT_TYPE_TEXT = ContentTypes.TEXT_PLAIN_UTF8;
    private final ActorRef proxyActor;
    private final HeaderTranslator headerTranslator;
    private final CompletableFuture<HttpResponse> httpResponseFuture;
    private final HttpRequest httpRequest;
    private final CommandConfig commandConfig;
    private final AcknowledgementAggregatorActorStarter ackregatorStarter;
    private final DittoDiagnosticLoggingAdapter logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

    @Nullable
    private DittoHeaders incomingCommandHeaders = null;

    @Nullable
    private Uri responseLocationUri = null;

    /* loaded from: input_file:org/eclipse/ditto/services/gateway/endpoints/actors/AbstractHttpRequestActor$HttpAcknowledgementConfig.class */
    private static final class HttpAcknowledgementConfig implements AcknowledgementConfig {
        private final HttpConfig httpConfig;

        private HttpAcknowledgementConfig(HttpConfig httpConfig) {
            this.httpConfig = httpConfig;
        }

        private static AcknowledgementConfig of(HttpConfig httpConfig) {
            return new HttpAcknowledgementConfig(httpConfig);
        }

        public Duration getForwarderFallbackTimeout() {
            return this.httpConfig.getRequestTimeout();
        }

        public Duration getCollectorFallbackLifetime() {
            return this.httpConfig.getRequestTimeout();
        }

        public Duration getCollectorFallbackAskTimeout() {
            return this.httpConfig.getRequestTimeout();
        }

        public int getIssuedMaxBytes() {
            return 0;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractHttpRequestActor(ActorRef actorRef, HeaderTranslator headerTranslator, HttpRequest httpRequest, CompletableFuture<HttpResponse> completableFuture, HttpConfig httpConfig, CommandConfig commandConfig) {
        this.proxyActor = actorRef;
        this.headerTranslator = headerTranslator;
        this.httpResponseFuture = completableFuture;
        this.httpRequest = httpRequest;
        this.commandConfig = commandConfig;
        this.ackregatorStarter = AcknowledgementAggregatorActorStarter.of(getContext(), HttpAcknowledgementConfig.of(httpConfig), headerTranslator, new AbstractCommandAckRequestSetter[]{ThingModifyCommandAckRequestSetter.getInstance(), ThingLiveCommandAckRequestSetter.getInstance(), MessageCommandAckRequestSetter.getInstance()});
        getContext().setReceiveTimeout(httpConfig.getRequestTimeout());
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Status.Failure.class, failure -> {
            DittoJsonException cause = failure.cause();
            if (cause instanceof JsonRuntimeException) {
                cause = new DittoJsonException((RuntimeException) cause);
            }
            if (cause instanceof DittoRuntimeException) {
                handleDittoRuntimeException((DittoRuntimeException) cause);
            } else if (cause instanceof EntityStreamSizeException) {
                this.logger.warning("Got EntityStreamSizeException when a 'Command' was expected which means that the max. allowed http payload size configured in Akka was overstepped in this request.");
                completeWithResult(createHttpResponse(HttpStatus.REQUEST_ENTITY_TOO_LARGE));
            } else {
                this.logger.error(cause, "Got unknown Status.Failure when a 'Command' was expected.");
                completeWithResult(createHttpResponse(HttpStatus.INTERNAL_SERVER_ERROR));
            }
        }).match(Whoami.class, this::handleWhoami).match(DittoRuntimeException.class, this::handleDittoRuntimeException).match(ReceiveTimeout.class, receiveTimeout -> {
            handleDittoRuntimeException(GatewayServiceUnavailableException.newBuilder().dittoHeaders(DittoHeaders.empty()).build());
        }).match(Command.class, this::handleCommand).matchAny(obj -> {
            this.logger.warning("Got unknown message, expected a 'Command': {}", obj);
            completeWithResult(createHttpResponse(HttpStatus.INTERNAL_SERVER_ERROR));
        }).build();
    }

    private static HttpResponse createHttpResponse(HttpStatus httpStatus) {
        return HttpResponse.create().withStatus(httpStatus.getCode());
    }

    private void handleCommand(Command<?> command) {
        try {
            this.logger.setCorrelationId(command);
            this.incomingCommandHeaders = command.getDittoHeaders();
            this.ackregatorStarter.start(command, this::onAggregatedResponseOrError, this::handleCommandWithAckregator, this::handleCommandWithoutAckregator);
            getContext().become(ReceiveBuilder.create().match(Acknowledgements.class, this::completeAcknowledgements).build().orElse(getResponseAwaitingBehavior(getTimeoutExceptionSupplier(command))));
        } catch (DittoRuntimeException e) {
            handleDittoRuntimeException(e);
        }
    }

    private Void onAggregatedResponseOrError(Object obj) {
        getSelf().tell(obj, ActorRef.noSender());
        return null;
    }

    private Void handleCommandWithAckregator(Signal<?> signal, ActorRef actorRef) {
        this.logger.debug("Got <{}>. Telling the target actor about it.", signal);
        this.proxyActor.tell(signal, actorRef);
        return null;
    }

    private Void handleCommandWithoutAckregator(Signal<?> signal) {
        if (isDevOpsCommand(signal) || !shallAcceptImmediately(signal)) {
            handleCommandWithResponse(signal, getResponseAwaitingBehavior(getTimeoutExceptionSupplier(signal)));
            return null;
        }
        handleCommandAndAcceptImmediately(signal);
        return null;
    }

    private void completeAcknowledgements(Acknowledgements acknowledgements) {
        completeWithResult(createCommandResponse(acknowledgements.getDittoHeaders(), acknowledgements.getHttpStatus(), mapAcknowledgementsForHttp(acknowledgements)));
    }

    private void handleCommandAndAcceptImmediately(Signal<?> signal) {
        this.logger.debug("Received <{}> that doesn't expect a response. Answering with status code 202 ...", signal);
        this.proxyActor.tell(signal, getSelf());
        completeWithResult(createHttpResponse(HttpStatus.ACCEPTED));
    }

    private Supplier<DittoRuntimeException> getTimeoutExceptionSupplier(WithDittoHeaders<?> withDittoHeaders) {
        return () -> {
            return GatewayCommandTimeoutException.newBuilder(getContext().getReceiveTimeout()).dittoHeaders(withDittoHeaders.getDittoHeaders()).build();
        };
    }

    private void rememberResponseLocationUri(CommandResponse<?> commandResponse) {
        if (HttpStatus.CREATED.equals(commandResponse.getHttpStatus())) {
            this.responseLocationUri = getUriForLocationHeader(this.httpRequest, commandResponse);
        }
    }

    private DittoHeaders getExternalHeaders(DittoHeaders dittoHeaders) {
        return DittoHeaders.of(this.headerTranslator.toExternalAndRetainKnownHeaders(dittoHeaders));
    }

    private void handleWhoami(Whoami whoami) {
        this.logger.withCorrelationId(whoami).debug("Got Whoami.", whoami);
        AbstractActor.ActorContext context = getContext();
        WhoamiResponse createWhoamiResponse = createWhoamiResponse(whoami);
        context.become(getResponseAwaitingBehavior(getTimeoutExceptionSupplier(whoami)));
        getSelf().tell(createWhoamiResponse, getSender());
    }

    protected WhoamiResponse createWhoamiResponse(Whoami whoami) {
        DittoHeaders dittoHeaders = whoami.getDittoHeaders();
        return WhoamiResponse.of(DefaultUserInformation.fromAuthorizationContext(getAuthContextWithPrefixedSubjectsFromHeaders(dittoHeaders)), dittoHeaders);
    }

    @Deprecated
    private static AuthorizationContext getAuthContextWithPrefixedSubjectsFromHeaders(DittoHeaders dittoHeaders) {
        String str = (String) dittoHeaders.get(DittoHeaderDefinition.AUTHORIZATION_CONTEXT.getKey());
        return AuthorizationModelFactory.newAuthContext(str == null ? JsonObject.empty() : JsonObject.of(str));
    }

    private void handleCommandWithResponse(Signal<?> signal, AbstractActor.Receive receive) {
        this.logger.debug("Got <{}>. Telling the target actor about it.", signal);
        this.proxyActor.tell(signal, getSelf());
        AbstractActor.ActorContext context = getContext();
        DittoHeaders dittoHeaders = signal.getDittoHeaders();
        if (!isDevOpsCommand(signal)) {
            context.setReceiveTimeout((Duration) dittoHeaders.getTimeout().orElse(this.commandConfig.getDefaultTimeout()));
        }
        context.become(receive);
    }

    private static boolean isDevOpsCommand(Signal<?> signal) {
        return signal instanceof DevOpsCommand;
    }

    private AbstractActor.Receive getResponseAwaitingBehavior(Supplier<DittoRuntimeException> supplier) {
        return ReceiveBuilder.create().matchEquals(COMPLETE_MESSAGE, str -> {
            this.logger.debug("Got stream's <{}> message.", COMPLETE_MESSAGE);
        }).match(HttpResponse.class, this::completeWithResult).match(MessageCommandResponse.class, messageCommandResponse -> {
            completeWithResult(handleMessageResponseMessage(messageCommandResponse));
        }).match(CommandResponse.class, commandResponse -> {
            return commandResponse instanceof WithEntity;
        }, commandResponse2 -> {
            this.logger.withCorrelationId(commandResponse2).debug("Got <{}> message.", commandResponse2.getType());
            rememberResponseLocationUri(commandResponse2);
            WithEntity withEntity = (WithEntity) commandResponse2;
            HttpResponse enhanceResponseWithExternalDittoHeaders = enhanceResponseWithExternalDittoHeaders(createHttpResponse(commandResponse2.getHttpStatus()), commandResponse2.getDittoHeaders());
            Optional entityPlainString = withEntity.getEntityPlainString();
            org.eclipse.ditto.model.base.headers.contenttype.ContentType contentType = getContentType(commandResponse2.getDittoHeaders());
            completeWithResult(entityPlainString.isPresent() ? addEntityAccordingToContentType(enhanceResponseWithExternalDittoHeaders, (String) entityPlainString.get(), contentType) : addEntityAccordingToContentType(enhanceResponseWithExternalDittoHeaders, withEntity.getEntity(commandResponse2.getImplementedSchemaVersion()).toString(), contentType));
        }).match(CommandResponse.class, commandResponse3 -> {
            return commandResponse3 instanceof WithOptionalEntity;
        }, commandResponse4 -> {
            this.logger.withCorrelationId(commandResponse4).debug("Got <{}> message.", commandResponse4.getType());
            rememberResponseLocationUri(commandResponse4);
            completeWithResult(createCommandResponse(commandResponse4.getDittoHeaders(), commandResponse4.getHttpStatus(), (WithOptionalEntity) commandResponse4));
        }).match(ErrorResponse.class, errorResponse -> {
            handleDittoRuntimeException(errorResponse.getDittoRuntimeException());
        }).match(CommandResponse.class, commandResponse5 -> {
            this.logger.withCorrelationId(commandResponse5).error("Got 'CommandResponse' message which did neither implement 'WithEntity' nor 'WithOptionalEntity': <{}>!", commandResponse5);
            completeWithResult(createHttpResponse(HttpStatus.INTERNAL_SERVER_ERROR));
        }).match(JsonValueSourceRef.class, this::handleJsonValueSourceRef).match(Status.Failure.class, failure -> {
            return failure.cause() instanceof AskTimeoutException;
        }, failure2 -> {
            Throwable cause = failure2.cause();
            this.logger.error(cause, "Got <{}> when a command response was expected: <{}>!", cause.getClass().getSimpleName(), cause.getMessage());
            completeWithResult(createHttpResponse(HttpStatus.INTERNAL_SERVER_ERROR));
        }).match(JsonRuntimeException.class, jsonRuntimeException -> {
            handleDittoRuntimeException(new DittoJsonException(jsonRuntimeException));
        }).match(DittoRuntimeException.class, this::handleDittoRuntimeException).match(ReceiveTimeout.class, receiveTimeout -> {
            handleReceiveTimeout(supplier);
        }).match(Status.Failure.class, failure3 -> {
            return failure3.cause() instanceof DittoRuntimeException;
        }, failure4 -> {
            handleDittoRuntimeException((DittoRuntimeException) failure4.cause());
        }).match(Status.Failure.class, failure5 -> {
            Throwable cause = failure5.cause();
            this.logger.error(cause.fillInStackTrace(), "Got <Status.Failure> when a command response was expected: <{}>!", cause.getMessage());
            completeWithResult(createHttpResponse(HttpStatus.INTERNAL_SERVER_ERROR));
        }).matchAny(obj -> {
            this.logger.error("Got unknown message when a command response was expected: <{}>!", obj);
            completeWithResult(createHttpResponse(HttpStatus.INTERNAL_SERVER_ERROR));
        }).build();
    }

    private HttpResponse handleMessageResponseMessage(MessageCommandResponse<?, ?> messageCommandResponse) {
        HttpResponse createHttpResponse;
        Message message = messageCommandResponse.getMessage();
        Optional payload = message.getPayload();
        Optional rawPayload = message.getRawPayload();
        Optional filter = Optional.of(messageCommandResponse.getHttpStatus()).filter(httpStatus -> {
            return StatusCodes.lookup(httpStatus.getCode()).isPresent();
        }).filter(httpStatus2 -> {
            return !HttpStatus.BAD_GATEWAY.equals(httpStatus2);
        });
        if (((Boolean) filter.map(httpStatus3 -> {
            return Boolean.valueOf(!HttpStatus.NO_CONTENT.equals(httpStatus3));
        }).orElse(true)).booleanValue()) {
            Optional contentType = message.getContentType();
            ContentType$ contentType$ = ContentType$.MODULE$;
            Objects.requireNonNull(contentType$);
            Optional map = contentType.map(contentType$::parse).filter((v0) -> {
                return v0.isRight();
            }).map((v0) -> {
                return v0.right();
            }).map((v0) -> {
                return v0.get();
            });
            boolean isPresent = map.map((v0) -> {
                return v0.value();
            }).map((v0) -> {
                return org.eclipse.ditto.model.base.headers.contenttype.ContentType.of(v0);
            }).filter((v0) -> {
                return v0.isBinary();
            }).isPresent();
            createHttpResponse = createHttpResponse((HttpStatus) filter.orElse(HttpStatus.OK));
            if (payload.isPresent() && map.isPresent() && !isPresent) {
                createHttpResponse = (HttpResponse) createHttpResponse.withEntity(HttpEntities.create((akka.http.scaladsl.model.ContentType) map.get(), ByteString.fromString(payload.get().toString())));
            } else if (rawPayload.isPresent() && map.isPresent() && isPresent) {
                createHttpResponse = (HttpResponse) createHttpResponse.withEntity(HttpEntities.create((akka.http.scaladsl.model.ContentType) map.get(), ((ByteBuffer) rawPayload.get()).array()));
            } else if (rawPayload.isPresent()) {
                createHttpResponse = (HttpResponse) createHttpResponse.withEntity(HttpEntities.create(((ByteBuffer) rawPayload.get()).array()));
            }
        } else {
            rawPayload.ifPresent(byteBuffer -> {
                this.logger.withCorrelationId(messageCommandResponse).info("Response payload was set but response status code was also set to <{}>. Ignoring the response payload. Command=<{}>", filter, messageCommandResponse);
            });
            createHttpResponse = createHttpResponse(HttpStatus.NO_CONTENT);
        }
        return enhanceResponseWithExternalDittoHeaders(createHttpResponse, messageCommandResponse.getDittoHeaders());
    }

    private void handleReceiveTimeout(Supplier<DittoRuntimeException> supplier) {
        DittoRuntimeException dittoRuntimeException = supplier.get();
        this.logger.withCorrelationId(dittoRuntimeException).info("Got <{}> when a response was expected after timeout <{}>.", ReceiveTimeout.class.getSimpleName(), getContext().getReceiveTimeout());
        handleDittoRuntimeException(dittoRuntimeException);
    }

    private void handleDittoRuntimeException(DittoRuntimeException dittoRuntimeException) {
        this.logger.withCorrelationId(dittoRuntimeException).info("DittoRuntimeException <{}>: <{}>.", dittoRuntimeException.getErrorCode(), dittoRuntimeException.getMessage());
        completeWithResult(enhanceResponseWithExternalDittoHeaders(buildResponseWithoutHeadersFromDittoRuntimeException(dittoRuntimeException), dittoRuntimeException.getDittoHeaders()));
    }

    private static HttpResponse buildResponseWithoutHeadersFromDittoRuntimeException(DittoRuntimeException dittoRuntimeException) {
        HttpStatus httpStatus = dittoRuntimeException.getHttpStatus();
        HttpResponse createHttpResponse = createHttpResponse(httpStatus);
        return HttpStatus.NOT_MODIFIED.equals(httpStatus) ? createHttpResponse : (HttpResponse) createHttpResponse.withEntity(CONTENT_TYPE_JSON, ByteString.fromString(dittoRuntimeException.toJsonString()));
    }

    private HttpResponse enhanceResponseWithExternalDittoHeaders(HttpResponse httpResponse, DittoHeaders dittoHeaders) {
        DittoDiagnosticLoggingAdapter withCorrelationId = this.logger.withCorrelationId(dittoHeaders);
        DittoHeaders externalHeaders = getExternalHeaders(dittoHeaders);
        if (externalHeaders.isEmpty()) {
            withCorrelationId.debug("No external headers for enhancing the response, returning it as-is.");
            return httpResponse;
        }
        withCorrelationId.debug("Enhancing response with external headers <{}>.", externalHeaders);
        return (HttpResponse) httpResponse.withHeaders((List) externalHeaders.entrySet().stream().filter(entry -> {
            return !((String) entry.getKey()).equalsIgnoreCase(DittoHeaderDefinition.CONTENT_TYPE.getKey());
        }).map(entry2 -> {
            return RawHeader.create((String) entry2.getKey(), (String) entry2.getValue());
        }).collect(Collectors.toList()));
    }

    private void completeWithResult(HttpResponse httpResponse) {
        HttpResponse createHttpResponse = (isResponseRequired() || !httpResponse.status().isSuccess()) ? httpResponse : createHttpResponse(HttpStatus.ACCEPTED);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Responding with HTTP response code <{}>.", Integer.valueOf(createHttpResponse.status().intValue()));
            this.logger.debug("Responding with entity <{}>.", createHttpResponse.entity());
        }
        this.httpResponseFuture.complete(createHttpResponse);
        stop();
    }

    private void stop() {
        this.logger.clearMDC();
        getContext().stop(getSelf());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static HttpResponse addEntityAccordingToContentType(HttpResponse httpResponse, String str, org.eclipse.ditto.model.base.headers.contenttype.ContentType contentType) {
        return (HttpResponse) httpResponse.withEntity(ContentTypes.parse(contentType.getValue()), contentType.isBinary() ? ByteString.fromArray(Base64.getDecoder().decode(str)) : ByteString.fromString(str));
    }

    private static org.eclipse.ditto.model.base.headers.contenttype.ContentType getContentType(DittoHeaders dittoHeaders) {
        return (org.eclipse.ditto.model.base.headers.contenttype.ContentType) dittoHeaders.getDittoContentType().orElse(org.eclipse.ditto.model.base.headers.contenttype.ContentType.APPLICATION_JSON);
    }

    private HttpResponse createCommandResponse(DittoHeaders dittoHeaders, HttpStatus httpStatus, WithOptionalEntity withOptionalEntity) {
        UnaryOperator unaryOperator = httpResponse -> {
            return enhanceResponseWithExternalDittoHeaders(httpResponse, dittoHeaders);
        };
        return (HttpResponse) createBodyAddingResponseMapper(dittoHeaders, withOptionalEntity).apply((HttpResponse) unaryOperator.andThen(this::modifyResponse).apply(createHttpResponse(httpStatus)));
    }

    private static UnaryOperator<HttpResponse> createBodyAddingResponseMapper(DittoHeaders dittoHeaders, WithOptionalEntity withOptionalEntity) {
        return httpResponse -> {
            return StatusCodes.NO_CONTENT.equals(httpResponse.status()) ? httpResponse : (HttpResponse) withOptionalEntity.getEntity((JsonSchemaVersion) dittoHeaders.getSchemaVersion().orElse(dittoHeaders.getImplementedSchemaVersion())).map(jsonValue -> {
                return addEntityAccordingToContentType(httpResponse, jsonValue.toString(), getContentType(dittoHeaders));
            }).orElse(httpResponse);
        };
    }

    protected HttpResponse modifyResponse(HttpResponse httpResponse) {
        return (HttpStatus.CREATED.getCode() != httpResponse.status().intValue() || null == this.responseLocationUri) ? httpResponse : (HttpResponse) httpResponse.addHeader(Location.create(this.responseLocationUri));
    }

    protected Uri getUriForLocationHeader(HttpRequest httpRequest, CommandResponse<?> commandResponse) {
        return new UriForLocationHeaderSupplier(httpRequest, commandResponse).get();
    }

    private Acknowledgements mapAcknowledgementsForHttp(Acknowledgements acknowledgements) {
        return !isResponseRequired() ? acknowledgements.getHttpStatus().isSuccess() ? acknowledgements : Acknowledgements.of((List) acknowledgements.stream().map(acknowledgement -> {
            return Acknowledgement.of(acknowledgement.getLabel(), acknowledgement.getEntityId(), acknowledgement.getHttpStatus(), DittoHeaders.empty());
        }).collect(Collectors.toList()), acknowledgements.getDittoHeaders()) : Acknowledgements.of((Collection) acknowledgements.stream().map(this::setResponseLocationForAcknowledgement).collect(Collectors.toList()), acknowledgements.getDittoHeaders());
    }

    private boolean isResponseRequired() {
        return this.incomingCommandHeaders == null || this.incomingCommandHeaders.isResponseRequired();
    }

    private Acknowledgement setResponseLocationForAcknowledgement(Acknowledgement acknowledgement) {
        if (DittoAcknowledgementLabel.TWIN_PERSISTED.equals(acknowledgement.getLabel())) {
            rememberResponseLocationUri(acknowledgement);
            if (this.responseLocationUri != null) {
                Location create = Location.create(this.responseLocationUri);
                return acknowledgement.setDittoHeaders(acknowledgement.getDittoHeaders().toBuilder().putHeader(create.lowercaseName(), create.value()).build());
            }
        }
        return acknowledgement;
    }

    private static boolean shallAcceptImmediately(WithDittoHeaders<?> withDittoHeaders) {
        DittoHeaders dittoHeaders = withDittoHeaders.getDittoHeaders();
        return !dittoHeaders.isResponseRequired() && dittoHeaders.getAcknowledgementRequests().isEmpty();
    }

    private void handleJsonValueSourceRef(JsonValueSourceRef jsonValueSourceRef) {
        this.logger.debug("Received <{}> from <{}>.", jsonValueSourceRef.getClass().getSimpleName(), getSender());
        HttpResponse apply = JsonValueSourceToHttpResponse.getInstance().apply(jsonValueSourceRef.getSource());
        enhanceResponseWithExternalDittoHeaders(apply, this.incomingCommandHeaders);
        completeWithResult(apply);
    }
}
