package io.datarouter.metric.counter.collection;

import com.google.gson.Gson;
import io.datarouter.conveyor.message.ConveyorMessage;
import io.datarouter.instrumentation.count.CountBatchDto;
import io.datarouter.metric.DatarouterMetricExecutors;
import io.datarouter.metric.config.DatarouterCountSettingRoot;
import io.datarouter.metric.counter.CountBlobService;
import io.datarouter.metric.counter.DatarouterCountPublisherDao;
import io.datarouter.scanner.Scanner;
import io.datarouter.util.UlidTool;
import io.datarouter.util.number.RandomTool;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/metric/counter/collection/CountFlusher.class */
public class CountFlusher {
    private static final Logger logger = LoggerFactory.getLogger(CountFlusher.class);
    private final String serviceName;
    private final String serverName;
    private final Gson gson;
    private final DatarouterCountPublisherDao publisherDao;
    private final CountBlobService countBlobService;
    private final Queue<Map<Long, Map<String, Long>>> flushQueue = new ArrayBlockingQueue(60);
    private final DatarouterMetricExecutors.DatarouterCountFlushSchedulerExecutor flushScheduler;
    private final DatarouterCountSettingRoot settings;

    @Singleton
    /* loaded from: input_file:io/datarouter/metric/counter/collection/CountFlusher$CountFlusherFactory.class */
    public static class CountFlusherFactory {

        @Inject
        private Gson gson;

        @Inject
        private CountBlobService countBlobService;

        @Inject
        private DatarouterCountPublisherDao publisherDao;

        @Inject
        private DatarouterMetricExecutors.DatarouterCountFlushSchedulerExecutor flushScheduler;

        @Inject
        private DatarouterCountSettingRoot settings;

        public CountFlusher create(String str, String str2) {
            return new CountFlusher(str, str2, this.gson, this.publisherDao, this.countBlobService, this.flushScheduler, this.settings);
        }
    }

    private CountFlusher(String str, String str2, Gson gson, DatarouterCountPublisherDao datarouterCountPublisherDao, CountBlobService countBlobService, DatarouterMetricExecutors.DatarouterCountFlushSchedulerExecutor datarouterCountFlushSchedulerExecutor, DatarouterCountSettingRoot datarouterCountSettingRoot) {
        this.serviceName = str;
        this.serverName = str2;
        this.gson = gson;
        this.publisherDao = datarouterCountPublisherDao;
        this.countBlobService = countBlobService;
        this.flushScheduler = datarouterCountFlushSchedulerExecutor;
        this.settings = datarouterCountSettingRoot;
        this.flushScheduler.scheduleWithFixedDelay(this::flush, 0L, 1L, TimeUnit.SECONDS);
    }

    public void saveCounts(Map<Long, Map<String, Long>> map) {
        if (this.flushQueue.offer(map)) {
            return;
        }
        logger.warn("flushQueue rejected our counts");
    }

    private void flush() {
        try {
            if (!((Boolean) this.settings.saveCounts.get()).booleanValue() && !((Boolean) this.settings.saveCountBlobs.get()).booleanValue()) {
                this.flushQueue.clear();
                return;
            }
            while (true) {
                Map<Long, Map<String, Long>> peek = this.flushQueue.peek();
                if (peek == null) {
                    return;
                }
                Scanner.of(peek.keySet()).include(l -> {
                    return ((Map) peek.get(l)).isEmpty();
                }).map(l2 -> {
                    return l2.toString();
                }).flush(list -> {
                    if (list.size() > 0) {
                        logger.warn("found empty maps for timestamps={}", String.join(",", list));
                    }
                });
                String json = this.gson.toJson(new CountBatchDto(Long.valueOf(RandomTool.nextPositiveLong()), this.serviceName, this.serverName, peek));
                ConveyorMessage conveyorMessage = new ConveyorMessage(UlidTool.nextUlid(), json);
                logCountsSpec(peek, json);
                if (((Boolean) this.settings.saveCounts.get()).booleanValue()) {
                    if (((Boolean) this.settings.saveCountBlobs.get()).booleanValue() && ((Boolean) this.settings.skipsSaveCountsWhenSaveCountBlobsIsTrue.get()).booleanValue()) {
                        logger.warn("skipping save. saving only to blobs instead");
                    } else {
                        this.publisherDao.put(conveyorMessage);
                    }
                }
                if (((Boolean) this.settings.saveCountBlobs.get()).booleanValue()) {
                    flushCountBlobs(peek);
                }
                this.flushQueue.poll();
            }
        } catch (Throwable th) {
            logger.warn("", th);
        }
    }

    private void flushCountBlobs(Map<Long, Map<String, Long>> map) {
        if (((Boolean) this.settings.saveCountBlobs.get()).booleanValue()) {
            try {
                this.countBlobService.add(new CountBatchDto((Long) null, this.serviceName, this.serverName, map));
            } catch (RuntimeException e) {
                logger.error("ignoring count blob publishing failure", e);
            }
        }
    }

    private static void logCountsSpec(Map<Long, Map<String, Long>> map, String str) {
        logger.info("counts buckets={}, names={}, jsonLength={}", new Object[]{Integer.valueOf(map.size()), Integer.valueOf(((Integer) Scanner.of(map.values()).map((v0) -> {
            return v0.size();
        }).reduce(0, (v0, v1) -> {
            return Integer.sum(v0, v1);
        })).intValue()), Integer.valueOf(str.length())});
    }
}
