package io.continual.onap.services.publisher;

import io.continual.onap.services.mrCommon.OnapMrResponse;
import io.continual.onap.services.publisher.OnapMsgRouterPublisher;
import java.io.Closeable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/continual/onap/services/publisher/OnapMsgRouterBatchPublisher.class */
public class OnapMsgRouterBatchPublisher implements Closeable {
    private final OnapMsgRouterPublisher fPub;
    private final LinkedList<MessageWrapper> fPendingMsgs;
    private final int fMaxBatch;
    private final long fMaxTimeMs;
    private final long fBackoffMs;
    private final long fMaxPendingCount;
    private final DropPolicy fDropPolicy;
    private final Logger fLog;
    private final SvcThread fService;
    private static final Logger defaultLog = LoggerFactory.getLogger(OnapMsgRouterBatchPublisher.class);
    private static final long kEmptyQueueMaxWaitMs = 100;
    private static final int kDefaultMaxBatch = 100;
    private static final long kDefaultMaxTimeMs = 500;
    private static final long kDefaultBackoffTimeMs = 1000;
    private static final long kDefaultMaxPendingCount = -1;

    /* loaded from: input_file:io/continual/onap/services/publisher/OnapMsgRouterBatchPublisher$Builder.class */
    public static class Builder {
        private OnapMsgRouterPublisher fPub = null;
        private Logger fLog = OnapMsgRouterBatchPublisher.defaultLog;
        private int fMaxBatch = OnapMsgRouterBatchPublisher.kDefaultMaxBatch;
        private long fMaxTimeMs = OnapMsgRouterBatchPublisher.kDefaultMaxTimeMs;
        private long fBackoffMs = OnapMsgRouterBatchPublisher.kDefaultBackoffTimeMs;
        private long fMaxPendingCount = OnapMsgRouterBatchPublisher.kDefaultMaxPendingCount;
        private DropPolicy fMaxPendingDropPolicy = DropPolicy.DROP_OLDEST;

        public Builder usingPublisher(OnapMsgRouterPublisher onapMsgRouterPublisher) {
            this.fPub = onapMsgRouterPublisher;
            return this;
        }

        public Builder logTo(Logger logger) {
            this.fLog = logger;
            return this;
        }

        public Builder batchAtMost(int i) {
            this.fMaxBatch = i;
            return this;
        }

        public Builder batchMaxAgeMs(int i) {
            this.fMaxTimeMs = i;
            return this;
        }

        public Builder withMaxPendingCount(int i, DropPolicy dropPolicy) {
            this.fMaxPendingCount = i;
            this.fMaxPendingDropPolicy = dropPolicy;
            return this;
        }

        public Builder retryAfterMs(int i) {
            this.fBackoffMs = i;
            return this;
        }

        public OnapMsgRouterBatchPublisher build() {
            return new OnapMsgRouterBatchPublisher(this);
        }
    }

    /* loaded from: input_file:io/continual/onap/services/publisher/OnapMsgRouterBatchPublisher$DropPolicy.class */
    public enum DropPolicy {
        DROP_OLDEST,
        DROP_NEWEST;

        public static DropPolicy fromSettingString(String str) {
            if (str == null) {
                return DROP_OLDEST;
            }
            try {
                return valueOf(str.trim().toUpperCase());
            } catch (IllegalArgumentException e) {
                return DROP_OLDEST;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/continual/onap/services/publisher/OnapMsgRouterBatchPublisher$MessageWrapper.class */
    public class MessageWrapper {
        public final OnapMsgRouterPublisher.Message fMsg;
        public final long fQueuedAtMs;

        public MessageWrapper(OnapMsgRouterPublisher.Message message) {
            this.fMsg = message;
            this.fQueuedAtMs = OnapMsgRouterBatchPublisher.this.fPub.getClock().nowMs();
        }

        public OnapMsgRouterPublisher.Message message() {
            return this.fMsg;
        }

        public long queuedAtMs() {
            return this.fQueuedAtMs;
        }
    }

    /* loaded from: input_file:io/continual/onap/services/publisher/OnapMsgRouterBatchPublisher$SvcThread.class */
    private class SvcThread extends Thread {
        private boolean fClose;

        private SvcThread() {
            this.fClose = false;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            long j = 100;
            while (!shouldClose()) {
                synchronized (OnapMsgRouterBatchPublisher.this.fPendingMsgs) {
                    if (j > 0) {
                        try {
                            OnapMsgRouterBatchPublisher.this.fPendingMsgs.wait(j);
                        } catch (InterruptedException e) {
                            OnapMsgRouterBatchPublisher.this.fLog.warn("Background thread interrupted while waiting for input signal: {}", e.getMessage());
                        }
                    }
                    j = OnapMsgRouterBatchPublisher.this.checkSend();
                    OnapMsgRouterBatchPublisher.this.checkForDrops();
                }
            }
        }

        public synchronized void signalClose() {
            this.fClose = true;
        }

        private synchronized boolean shouldClose() {
            return this.fClose;
        }
    }

    public static OnapMsgRouterBatchPublisher build(OnapMsgRouterPublisher onapMsgRouterPublisher, int i, int i2) {
        return new Builder().usingPublisher(onapMsgRouterPublisher).batchAtMost(i).batchMaxAgeMs(i2).build();
    }

    public synchronized void start() {
        this.fService.start();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        try {
            this.fService.signalClose();
            this.fService.join();
            synchronized (this.fPendingMsgs) {
                long nowMs = this.fPub.getClock().nowMs() + 60000;
                while (this.fPendingMsgs.size() > 0 && this.fPub.getClock().nowMs() < nowMs) {
                    long send = send();
                    if (send > 0) {
                        Thread.sleep(send);
                    }
                }
                if (this.fPendingMsgs.size() > 0) {
                    this.fLog.warn("Unable to send {} messages before giving up.", Integer.valueOf(this.fPendingMsgs.size()));
                } else {
                    this.fLog.info("Batch sender closed with no pending messages.");
                }
            }
        } catch (InterruptedException e) {
            this.fLog.warn("Interrupted while closing background send thread: {}", e.getMessage());
            Thread.currentThread().interrupt();
        }
    }

    public OnapMsgRouterBatchPublisher send(OnapMsgRouterPublisher.Message message) {
        synchronized (this.fPendingMsgs) {
            this.fPendingMsgs.add(new MessageWrapper(message));
            this.fPendingMsgs.notify();
        }
        return this;
    }

    public OnapMsgRouterBatchPublisher send(List<OnapMsgRouterPublisher.Message> list) {
        LinkedList linkedList = new LinkedList();
        Iterator<OnapMsgRouterPublisher.Message> it = list.iterator();
        while (it.hasNext()) {
            linkedList.add(new MessageWrapper(it.next()));
        }
        synchronized (this.fPendingMsgs) {
            this.fPendingMsgs.addAll(linkedList);
            this.fPendingMsgs.notify();
        }
        return this;
    }

    private OnapMsgRouterBatchPublisher(Builder builder) {
        this.fPub = builder.fPub;
        this.fLog = builder.fLog;
        this.fPendingMsgs = new LinkedList<>();
        this.fMaxBatch = builder.fMaxBatch;
        this.fMaxTimeMs = builder.fMaxTimeMs;
        this.fBackoffMs = builder.fBackoffMs;
        this.fMaxPendingCount = builder.fMaxPendingCount;
        this.fDropPolicy = builder.fMaxPendingDropPolicy;
        this.fService = new SvcThread();
        if (this.fPub == null) {
            throw new IllegalArgumentException("A publisher must be provided.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long checkSend() {
        long j;
        synchronized (this.fPendingMsgs) {
            int size = this.fPendingMsgs.size();
            long j2 = 100;
            if (size >= this.fMaxBatch || (size > 0 && this.fPendingMsgs.peekFirst().queuedAtMs() + this.fMaxTimeMs <= this.fPub.getClock().nowMs())) {
                j2 = send();
            } else if (size > 0) {
                j2 = Math.max(0L, (this.fPendingMsgs.peekFirst().queuedAtMs() + this.fMaxTimeMs) - this.fPub.getClock().nowMs());
            }
            j = j2;
        }
        return j;
    }

    private long send() {
        synchronized (this.fPendingMsgs) {
            LinkedList linkedList = new LinkedList();
            Iterator<MessageWrapper> it = this.fPendingMsgs.iterator();
            while (it.hasNext()) {
                linkedList.add(it.next().message());
            }
            OnapMrResponse send = this.fPub.send(linkedList);
            if (send.isSuccess()) {
                this.fPendingMsgs.clear();
                return kEmptyQueueMaxWaitMs;
            }
            this.fLog.warn("MR send failed with {} {}. Waiting {} ms for retry.", new Object[]{Integer.valueOf(send.getStatusCode()), send.getStatusText(), Long.valueOf(this.fBackoffMs)});
            return this.fBackoffMs;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkForDrops() {
        if (this.fMaxPendingCount < 0) {
            return;
        }
        int i = 0;
        long j = Long.MAX_VALUE;
        long j2 = Long.MIN_VALUE;
        synchronized (this.fPendingMsgs) {
            while (this.fPendingMsgs.size() > this.fMaxPendingCount) {
                long j3 = 0;
                switch (this.fDropPolicy) {
                    case DROP_NEWEST:
                        j3 = this.fPendingMsgs.removeLast().queuedAtMs();
                        break;
                    case DROP_OLDEST:
                        j3 = this.fPendingMsgs.removeFirst().queuedAtMs();
                        break;
                }
                i++;
                j = Math.min(j, j3);
                j2 = Math.max(j2, j3);
            }
            if (i > 0) {
                this.fLog.warn("Dropped {} messages with time range from {} to {}.", new Object[]{Integer.valueOf(i), Long.valueOf(j), Long.valueOf(j2)});
            }
        }
    }
}
