package io.vlingo.http.resource;

import io.vlingo.actors.Actor;
import io.vlingo.actors.CompletesEventually;
import io.vlingo.actors.Definition;
import io.vlingo.actors.Returns;
import io.vlingo.common.Completes;
import io.vlingo.http.Request;
import io.vlingo.http.RequestHeader;
import io.vlingo.http.Response;
import io.vlingo.http.ResponseHeader;
import io.vlingo.http.ResponseParser;
import io.vlingo.http.resource.Client;
import io.vlingo.wire.channel.ResponseChannelConsumer;
import io.vlingo.wire.message.ConsumerByteBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;

/* loaded from: input_file:io/vlingo/http/resource/ClientConsumerWorkerActor.class */
public class ClientConsumerWorkerActor extends Actor implements ClientConsumer {
    private static final String EmptyTestId = "";
    private static final AtomicInteger testIdGenerator = new AtomicInteger(0);
    private final String testId;
    private CompletesEventually completesEventually;
    private ResponseParser parser;
    private final RequestSender requestSender;

    public ClientConsumerWorkerActor(Client.Configuration configuration) throws Exception {
        this.testId = configuration.hasTestInfo() ? Integer.toString(testIdGenerator.incrementAndGet()) : EmptyTestId;
        this.requestSender = startRequestSender(configuration);
        this.parser = null;
    }

    public void consume(ConsumerByteBuffer consumerByteBuffer) {
        try {
            ByteBuffer asByteBuffer = consumerByteBuffer.asByteBuffer();
            if (!asByteBuffer.hasRemaining()) {
                logger().debug("CONSUMER: NO CONTENT");
                consumerByteBuffer.release();
                return;
            }
            logger().debug("CONSUMER:\n" + new String(asByteBuffer.array(), 0, asByteBuffer.remaining()));
            if (this.parser == null) {
                this.parser = ResponseParser.parserFor(asByteBuffer);
            } else {
                this.parser.parseNext(asByteBuffer);
            }
            if (this.parser.hasFullResponse()) {
                Response fullResponse = this.parser.fullResponse();
                if (this.testId != EmptyTestId) {
                    fullResponse.headers.add(ResponseHeader.of(Client.ClientIdCustomHeader, this.testId));
                    logger().debug("Client Worker: " + this.testId + " Consuming");
                    logger().debug("Client Worker: " + this.testId + "\nConsuming:\n" + fullResponse);
                }
                this.completesEventually.with(fullResponse);
                this.completesEventually = null;
                disperseStowedMessages();
            }
            if (!this.parser.isMissingContent()) {
                this.parser = null;
            }
        } finally {
            consumerByteBuffer.release();
        }
    }

    @Override // io.vlingo.http.resource.ClientConsumer
    public Completes<Response> requestWith(Request request, Completes<Response> completes) {
        this.completesEventually = stage().world().completesFor(Returns.value(completes));
        if (this.testId != EmptyTestId) {
            request.headers.add(RequestHeader.of(Client.ClientIdCustomHeader, this.testId));
            request.headers.add(RequestHeader.of("X-Correlation-ID", this.testId));
            logger().debug("Client Worker: " + this.testId + " Requesting");
            logger().debug("Client Worker: " + this.testId + "\nRequesting:\n" + request);
        }
        this.requestSender.sendRequest(request);
        stowMessages(new Class[]{ResponseChannelConsumer.class});
        return completes;
    }

    public void stop() {
        this.requestSender.stop();
        super.stop();
    }

    private RequestSender startRequestSender(Client.Configuration configuration) throws Exception {
        return (RequestSender) childActorFor(RequestSender.class, Definition.has(RequestSenderProbeActor.class, Definition.parameters(new Object[]{configuration, (ResponseChannelConsumer) selfAs(ResponseChannelConsumer.class), this.testId})));
    }
}
