/*
 * Decompiled with CFR 0.152.
 */
package com.azure.monitor.ingestion;

import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.exception.HttpResponseException;
import com.azure.core.http.HttpHeader;
import com.azure.core.http.rest.RequestOptions;
import com.azure.core.http.rest.Response;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.JsonSerializer;
import com.azure.core.util.serializer.JsonSerializerProviders;
import com.azure.core.util.serializer.ObjectSerializer;
import com.azure.monitor.ingestion.LogsIngestionClientBuilder;
import com.azure.monitor.ingestion.implementation.IngestionUsingDataCollectionRulesAsyncClient;
import com.azure.monitor.ingestion.implementation.LogsIngestionRequest;
import com.azure.monitor.ingestion.implementation.UploadLogsResponseHolder;
import com.azure.monitor.ingestion.models.LogsUploadError;
import com.azure.monitor.ingestion.models.LogsUploadException;
import com.azure.monitor.ingestion.models.LogsUploadOptions;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;

@ServiceClient(isAsync=true, builder=LogsIngestionClientBuilder.class)
public final class LogsIngestionAsyncClient {
    private static final ClientLogger LOGGER = new ClientLogger(LogsIngestionAsyncClient.class);
    private static final String CONTENT_ENCODING = "Content-Encoding";
    private static final long MAX_REQUEST_PAYLOAD_SIZE = 0x100000L;
    private static final String GZIP = "gzip";
    private static final JsonSerializer DEFAULT_SERIALIZER = JsonSerializerProviders.createInstance((boolean)true);
    private final IngestionUsingDataCollectionRulesAsyncClient service;

    LogsIngestionAsyncClient(IngestionUsingDataCollectionRulesAsyncClient service) {
        this.service = service;
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Void> upload(String ruleId, String streamName, Iterable<Object> logs) {
        return this.upload(ruleId, streamName, logs, new LogsUploadOptions());
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Void> upload(String ruleId, String streamName, Iterable<Object> logs, LogsUploadOptions options) {
        return FluxUtil.withContext(context -> this.upload(ruleId, streamName, logs, options, (Context)context));
    }

    @ServiceMethod(returns=ReturnType.SINGLE)
    public Mono<Response<Void>> uploadWithResponse(String ruleId, String streamName, BinaryData logs, RequestOptions requestOptions) {
        Objects.requireNonNull(ruleId, "'ruleId' cannot be null.");
        Objects.requireNonNull(streamName, "'streamName' cannot be null.");
        Objects.requireNonNull(logs, "'logs' cannot be null.");
        if (requestOptions == null) {
            requestOptions = new RequestOptions();
        }
        requestOptions.addRequestCallback(request -> {
            HttpHeader httpHeader = request.getHeaders().get(CONTENT_ENCODING);
            if (httpHeader == null) {
                BinaryData gzippedRequest = BinaryData.fromBytes((byte[])this.gzipRequest(logs.toBytes()));
                request.setBody(gzippedRequest);
                request.setHeader(CONTENT_ENCODING, GZIP);
            }
        });
        return this.service.uploadWithResponse(ruleId, streamName, logs, requestOptions);
    }

    Mono<Void> upload(String ruleId, String streamName, Iterable<Object> logs, LogsUploadOptions options, Context context) {
        return Mono.defer(() -> this.splitAndUpload(ruleId, streamName, logs, options, context));
    }

    private Mono<Void> splitAndUpload(String ruleId, String streamName, Iterable<Object> logs, LogsUploadOptions options, Context context) {
        int concurrency = 1;
        JsonSerializer objectSerializer = DEFAULT_SERIALIZER;
        if (options != null) {
            if (options.getObjectSerializer() != null) {
                objectSerializer = options.getObjectSerializer();
            }
            if (options.getMaxConcurrency() != null) {
                concurrency = options.getMaxConcurrency();
            }
        }
        JsonSerializer serializer = objectSerializer;
        Iterator<Object> iterator = logs.iterator();
        return Flux.create(arg_0 -> this.lambda$splitAndUpload$3((ObjectSerializer)serializer, iterator, arg_0)).flatMapSequential(request -> this.uploadToService(ruleId, streamName, context, (LogsIngestionRequest)request), concurrency).handle((responseHolder, sink) -> this.processResponse(options, (UploadLogsResponseHolder)responseHolder, (SynchronousSink<LogsUploadException>)sink)).collectList().handle((result, sink) -> this.processExceptions((List<LogsUploadException>)result, (SynchronousSink<Void>)sink));
    }

    private void createHttpRequest(ObjectSerializer serializer, Iterator<Object> iterator, FluxSink<LogsIngestionRequest> emitter) {
        try {
            long currentBatchSize = 0L;
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            JsonGenerator generator = JsonFactory.builder().build().createGenerator((OutputStream)byteArrayOutputStream);
            generator.writeStartArray();
            ArrayList<String> serializedLogs = new ArrayList<String>();
            ArrayList<Object> originalLogsRequest = new ArrayList<Object>();
            while (iterator.hasNext()) {
                Object currentLog = iterator.next();
                byte[] bytes = serializer.serializeToBytes(currentLog);
                int currentLogSize = bytes.length;
                if ((currentBatchSize += (long)currentLogSize) > 0x100000L) {
                    this.writeLogsAndCloseJsonGenerator(generator, serializedLogs);
                    byte[] zippedRequestBody = this.gzipRequest(byteArrayOutputStream.toByteArray());
                    emitter.next((Object)new LogsIngestionRequest(originalLogsRequest, zippedRequestBody));
                    byteArrayOutputStream = new ByteArrayOutputStream();
                    generator = JsonFactory.builder().build().createGenerator((OutputStream)byteArrayOutputStream);
                    generator.writeStartArray();
                    currentBatchSize = currentLogSize;
                    originalLogsRequest = new ArrayList();
                    serializedLogs.clear();
                }
                serializedLogs.add(new String(bytes, StandardCharsets.UTF_8));
                originalLogsRequest.add(currentLog);
            }
            if (currentBatchSize > 0L) {
                this.writeLogsAndCloseJsonGenerator(generator, serializedLogs);
                byte[] zippedRequestBody = this.gzipRequest(byteArrayOutputStream.toByteArray());
                emitter.next((Object)new LogsIngestionRequest(originalLogsRequest, zippedRequestBody));
            }
            emitter.complete();
        }
        catch (IOException e) {
            emitter.error((Throwable)e);
        }
    }

    private void processExceptions(List<LogsUploadException> result, SynchronousSink<Void> sink) {
        long failedLogsCount = 0L;
        ArrayList<HttpResponseException> exceptions = new ArrayList<HttpResponseException>();
        for (LogsUploadException exception : result) {
            exceptions.addAll(exception.getLogsUploadErrors());
            failedLogsCount += exception.getFailedLogsCount();
        }
        if (!exceptions.isEmpty()) {
            sink.error((Throwable)((Object)new LogsUploadException(exceptions, failedLogsCount)));
        } else {
            sink.complete();
        }
    }

    private void processResponse(LogsUploadOptions options, UploadLogsResponseHolder responseHolder, SynchronousSink<LogsUploadException> sink) {
        if (responseHolder.getException() != null) {
            Consumer<LogsUploadError> uploadLogsErrorConsumer = null;
            if (options != null) {
                uploadLogsErrorConsumer = options.getLogsUploadErrorConsumer();
            }
            if (uploadLogsErrorConsumer != null) {
                uploadLogsErrorConsumer.accept(new LogsUploadError(responseHolder.getException(), responseHolder.getRequest().getLogs()));
                return;
            }
            sink.next((Object)new LogsUploadException(Collections.singletonList(responseHolder.getException()), responseHolder.getRequest().getLogs().size()));
        }
    }

    private Mono<UploadLogsResponseHolder> uploadToService(String ruleId, String streamName, Context context, LogsIngestionRequest request) {
        RequestOptions requestOptions = new RequestOptions().addHeader(CONTENT_ENCODING, GZIP).setContext(context);
        return this.service.uploadWithResponse(ruleId, streamName, BinaryData.fromBytes((byte[])request.getRequestBody()), requestOptions).map(response -> new UploadLogsResponseHolder(null, null)).onErrorResume(HttpResponseException.class, ex -> Mono.fromSupplier(() -> new UploadLogsResponseHolder(request, (HttpResponseException)ex)));
    }

    private void writeLogsAndCloseJsonGenerator(JsonGenerator generator, List<String> serializedLogs) throws IOException {
        generator.writeRaw(serializedLogs.stream().collect(Collectors.joining(",")));
        generator.writeEndArray();
        generator.close();
    }

    private byte[] gzipRequest(byte[] bytes) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try (GZIPOutputStream zip = new GZIPOutputStream(byteArrayOutputStream);){
            zip.write(bytes);
        }
        catch (IOException exception) {
            throw LOGGER.logExceptionAsError((RuntimeException)new UncheckedIOException(exception));
        }
        return byteArrayOutputStream.toByteArray();
    }

    private /* synthetic */ void lambda$splitAndUpload$3(ObjectSerializer serializer, Iterator iterator, FluxSink emitter) {
        this.createHttpRequest(serializer, iterator, (FluxSink<LogsIngestionRequest>)emitter);
    }
}

