package io.mats3.util.eagercache;

import com.fasterxml.jackson.databind.ObjectReader;
import io.mats3.MatsEndpoint;
import io.mats3.MatsFactory;
import io.mats3.util.FieldBasedJacksonMapper;
import io.mats3.util.TraceId;
import io.mats3.util.compression.InflaterInputStreamWithStats;
import io.mats3.util.eagercache.MatsEagerCacheServer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/mats3/util/eagercache/MatsEagerCacheClient.class */
public class MatsEagerCacheClient<DATA> {
    private static final Logger log = LoggerFactory.getLogger(MatsEagerCacheClient.class);
    public static final String LOG_PREFIX = "#MatsEagerCache#S ";
    private final MatsFactory _matsFactory;
    private final String _dataName;
    private final Function<CacheReceivedData<?>, DATA> _fullUpdateMapper;
    private final ObjectReader _receivedDataTypeReader;
    private final ThreadPoolExecutor _receiveSingleBlockingThreadExecutorService;
    private volatile Function<CacheReceivedPartialData<?, DATA>, DATA> _partialUpdateMapper;
    private volatile long _cacheStartedTimestamp;
    private volatile long _initialPopulationRequestSentTimestamp;
    private volatile long _initialPopulationTimestamp;
    private volatile long _lastUpdateTimestamp;
    private volatile double _lastUpdateDurationMillis;
    private DATA _data;
    private volatile boolean _running;
    private volatile MatsEndpoint<?, ?> _broadcastTerminator;
    private final ReadWriteLock _cacheContentLock = new ReentrantReadWriteLock();
    private final Lock _cacheContentReadLock = this._cacheContentLock.readLock();
    private final Lock _cacheContentWriteLock = this._cacheContentLock.writeLock();
    private volatile CountDownLatch _initialPopulationLatch = new CountDownLatch(1);
    private volatile List<Runnable> _onInitialPopulationTasks = new ArrayList();
    private final CopyOnWriteArrayList<Consumer<CacheUpdated>> _cacheUpdatedListeners = new CopyOnWriteArrayList<>();
    private final AtomicInteger _numberOfFullUpdatesReceived = new AtomicInteger();
    private final AtomicInteger _numberOfPartialUpdatesReceived = new AtomicInteger();
    private final AtomicLong _accessCounter = new AtomicLong();

    /* loaded from: input_file:io/mats3/util/eagercache/MatsEagerCacheClient$CacheClientInformation.class */
    public interface CacheClientInformation {
        String getDataName();

        String getNodename();

        boolean isRunning();

        boolean isInitialPopulationDone();

        long getCacheStartedTimestamp();

        long getInitialPopulationRequestSentTimestamp();

        long getInitialPopulationTimestamp();

        long getLastUpdateTimestamp();

        double getLastUpdateDurationMillis();

        int getNumberOfFullUpdatesReceived();

        int getNumberOfPartialUpdatesReceived();

        long getNumberOfAccesses();
    }

    /* loaded from: input_file:io/mats3/util/eagercache/MatsEagerCacheClient$CacheReceived.class */
    public interface CacheReceived {
        boolean isFullUpdate();

        int getDataCount();

        long getCompressedSize();

        long getUncompressedSize();

        String getMetadata();
    }

    /* loaded from: input_file:io/mats3/util/eagercache/MatsEagerCacheClient$CacheReceivedData.class */
    public interface CacheReceivedData<RECV> extends CacheReceived {
        Stream<RECV> getReceivedDataStream();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/mats3/util/eagercache/MatsEagerCacheClient$CacheReceivedDataImpl.class */
    public static class CacheReceivedDataImpl<RECV> implements CacheReceivedData<RECV> {
        protected final boolean _fullUpdate;
        protected final int _dataCount;
        protected final String _metadata;
        protected final long _receivedUncompressedSize;
        protected final long _receivedCompressedSize;
        private final Stream<RECV> _rStream;

        public CacheReceivedDataImpl(boolean z, int i, String str, long j, long j2, Stream<RECV> stream) {
            this._fullUpdate = z;
            this._dataCount = i;
            this._metadata = str;
            this._receivedUncompressedSize = j;
            this._receivedCompressedSize = j2;
            this._rStream = stream;
        }

        @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheReceived
        public boolean isFullUpdate() {
            return this._fullUpdate;
        }

        @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheReceived
        public int getDataCount() {
            return this._dataCount;
        }

        @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheReceived
        public long getUncompressedSize() {
            return this._receivedUncompressedSize;
        }

        @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheReceived
        public long getCompressedSize() {
            return this._receivedCompressedSize;
        }

        @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheReceived
        public String getMetadata() {
            return this._metadata;
        }

        @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheReceivedData
        public Stream<RECV> getReceivedDataStream() {
            return this._rStream;
        }

        public String toString() {
            return "CacheReceivedData[" + (this._fullUpdate ? "FULL" : "PARTIAL") + ",count=" + this._dataCount + ",meta=" + this._metadata + ",uncompr=" + MatsEagerCacheServer._formatBytes(this._receivedUncompressedSize) + ",compr=" + MatsEagerCacheServer._formatBytes(this._receivedCompressedSize) + "]";
        }
    }

    /* loaded from: input_file:io/mats3/util/eagercache/MatsEagerCacheClient$CacheReceivedPartialData.class */
    public interface CacheReceivedPartialData<RECV, DATA> extends CacheReceivedData<RECV> {
        DATA getPreviousData();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/mats3/util/eagercache/MatsEagerCacheClient$CacheReceivedPartialDataImpl.class */
    public static class CacheReceivedPartialDataImpl<RECV, DATA> extends CacheReceivedDataImpl<RECV> implements CacheReceivedPartialData<RECV, DATA> {
        private final DATA _data;

        public CacheReceivedPartialDataImpl(boolean z, DATA data, int i, String str, Stream<RECV> stream, long j, long j2) {
            super(z, i, str, j, j2, stream);
            this._data = data;
        }

        @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheReceivedPartialData
        public DATA getPreviousData() {
            return this._data;
        }

        @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheReceivedDataImpl
        public String toString() {
            return "CacheReceivedPartialData[" + (this._fullUpdate ? "FULL" : "PARTIAL") + ",count=" + this._dataCount + ",meta=" + this._metadata + ",uncompr=" + MatsEagerCacheServer._formatBytes(this._receivedUncompressedSize) + ",compr=" + MatsEagerCacheServer._formatBytes(this._receivedCompressedSize) + ", prevData=" + this._data + "]";
        }
    }

    /* loaded from: input_file:io/mats3/util/eagercache/MatsEagerCacheClient$CacheUpdated.class */
    public interface CacheUpdated extends CacheReceived {
        double getUpdateDurationMillis();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/mats3/util/eagercache/MatsEagerCacheClient$CacheUpdatedImpl.class */
    public static class CacheUpdatedImpl extends CacheReceivedDataImpl<Void> implements CacheUpdated {
        private final double _updateDurationMillis;

        public CacheUpdatedImpl(boolean z, int i, String str, long j, long j2, double d) {
            super(z, i, str, j, j2, null);
            this._updateDurationMillis = d;
        }

        @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheUpdated
        public double getUpdateDurationMillis() {
            return this._updateDurationMillis;
        }

        @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheReceivedDataImpl
        public String toString() {
            return "CacheUpdatedData[" + (this._fullUpdate ? "FULL" : "PARTIAL") + ",count=" + this._dataCount + ",meta=" + this._metadata + ",uncompr=" + MatsEagerCacheServer._formatBytes(this._receivedUncompressedSize) + ",compr=" + MatsEagerCacheServer._formatBytes(this._receivedCompressedSize) + ", update:" + MatsEagerCacheServer._formatMillis(this._updateDurationMillis) + "]";
        }
    }

    public static <RECV, DATA> MatsEagerCacheClient<DATA> create(MatsFactory matsFactory, String str, Class<RECV> cls, Function<CacheReceivedData<RECV>, DATA> function) {
        return new MatsEagerCacheClient<>(matsFactory, str, cls, function);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <RECV> MatsEagerCacheClient(MatsFactory matsFactory, String str, Class<RECV> cls, Function<CacheReceivedData<RECV>, DATA> function) {
        this._matsFactory = matsFactory;
        this._dataName = str;
        this._fullUpdateMapper = function;
        this._receivedDataTypeReader = FieldBasedJacksonMapper.getMats3DefaultJacksonObjectMapper().readerFor(cls);
        this._receiveSingleBlockingThreadExecutorService = _createSingleThreadedExecutorService("MatsEagerCacheClient-" + this._dataName + "-receiveExecutor");
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <RECV> MatsEagerCacheClient<DATA> setPartialUpdateMapper(Function<CacheReceivedPartialData<RECV, DATA>, DATA> function) {
        this._partialUpdateMapper = function;
        return this;
    }

    public MatsEagerCacheClient<DATA> addAfterInitialPopulationTask(Runnable runnable) {
        boolean z = true;
        if (this._onInitialPopulationTasks != null) {
            synchronized (this) {
                if (this._onInitialPopulationTasks != null) {
                    this._onInitialPopulationTasks.add(runnable);
                    z = false;
                }
            }
        }
        if (z) {
            runnable.run();
        }
        return this;
    }

    public MatsEagerCacheClient<DATA> addCacheUpdatedListener(Consumer<CacheUpdated> consumer) {
        this._cacheUpdatedListeners.add(consumer);
        return this;
    }

    public DATA get() {
        CountDownLatch countDownLatch;
        if (this._initialPopulationLatch != null && (countDownLatch = this._initialPopulationLatch) != null) {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException("Got interrupted while waiting for initial population to be done.", e);
            }
        }
        this._cacheContentReadLock.lock();
        try {
            if (this._data == null) {
                throw new IllegalStateException("The data is null, which the cache API contract explicitly forbids. Fix your cache update code!");
            }
            this._accessCounter.incrementAndGet();
            return this._data;
        } finally {
            this._cacheContentReadLock.unlock();
        }
    }

    public MatsEagerCacheClient<DATA> start() {
        return _start();
    }

    public void requestFullUpdate() {
        _sendUpdateRequest("MANUAL");
    }

    public void close() {
        synchronized (this) {
            if (this._running) {
                this._broadcastTerminator.stop(30000);
                this._receiveSingleBlockingThreadExecutorService.shutdown();
                this._running = false;
            }
        }
    }

    public CacheClientInformation getCacheClientInformation() {
        return new CacheClientInformation() { // from class: io.mats3.util.eagercache.MatsEagerCacheClient.1
            @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheClientInformation
            public String getDataName() {
                return MatsEagerCacheClient.this._dataName;
            }

            @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheClientInformation
            public String getNodename() {
                return MatsEagerCacheClient.this._matsFactory.getFactoryConfig().getNodename();
            }

            @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheClientInformation
            public boolean isRunning() {
                return MatsEagerCacheClient.this._running;
            }

            @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheClientInformation
            public boolean isInitialPopulationDone() {
                return MatsEagerCacheClient.this._initialPopulationLatch == null;
            }

            @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheClientInformation
            public long getCacheStartedTimestamp() {
                return MatsEagerCacheClient.this._cacheStartedTimestamp;
            }

            @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheClientInformation
            public long getInitialPopulationRequestSentTimestamp() {
                return MatsEagerCacheClient.this._initialPopulationRequestSentTimestamp;
            }

            @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheClientInformation
            public long getInitialPopulationTimestamp() {
                return MatsEagerCacheClient.this._initialPopulationTimestamp;
            }

            @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheClientInformation
            public long getLastUpdateTimestamp() {
                return MatsEagerCacheClient.this._lastUpdateTimestamp;
            }

            @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheClientInformation
            public double getLastUpdateDurationMillis() {
                return MatsEagerCacheClient.this._lastUpdateDurationMillis;
            }

            @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheClientInformation
            public int getNumberOfFullUpdatesReceived() {
                return MatsEagerCacheClient.this._numberOfFullUpdatesReceived.get();
            }

            @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheClientInformation
            public int getNumberOfPartialUpdatesReceived() {
                return MatsEagerCacheClient.this._numberOfPartialUpdatesReceived.get();
            }

            @Override // io.mats3.util.eagercache.MatsEagerCacheClient.CacheClientInformation
            public long getNumberOfAccesses() {
                return MatsEagerCacheClient.this._accessCounter.get();
            }
        };
    }

    private MatsEagerCacheClient<DATA> _start() {
        this._cacheStartedTimestamp = System.currentTimeMillis();
        this._broadcastTerminator = this._matsFactory.subscriptionTerminator(MatsEagerCacheServer._getBroadcastTopic(this._dataName), Void.TYPE, MatsEagerCacheServer.BroadcastDto.class, (processContext, r7, broadcastDto) -> {
            if (broadcastDto.command.equals("UPDATE_FULL") || broadcastDto.command.equals("UPDATE_PARTIAL")) {
                byte[] bytes = processContext.getBytes("dataPayload");
                this._receiveSingleBlockingThreadExecutorService.submit(() -> {
                    _handleUpdateInExecutorThread(broadcastDto, bytes);
                });
            }
        });
        Thread thread = new Thread(() -> {
            if (!this._broadcastTerminator.waitForReceiving(600000)) {
                log.error("#MatsEagerCache#S " + "The Update handler SubscriptionTerminator Endpoint would not start within 10 minutes.");
                throw new IllegalStateException("The Update handler SubscriptionTerminator Endpoint would not start within 10 minutes.");
            }
            this._initialPopulationRequestSentTimestamp = System.currentTimeMillis();
            _sendUpdateRequest("BOOT");
        });
        thread.setName("MatsEagerCacheClient-" + this._dataName + "-initialCacheUpdateRequest");
        thread.setDaemon(true);
        thread.start();
        this._running = true;
        return this;
    }

    static ThreadPoolExecutor _createSingleThreadedExecutorService(String str) {
        return new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new SynchronousQueue(), runnable -> {
            Thread thread = new Thread(runnable, str);
            thread.setDaemon(true);
            return thread;
        }, (runnable2, threadPoolExecutor) -> {
            try {
                threadPoolExecutor.getQueue().put(runnable2);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RejectedExecutionException("Interrupted while waiting to enqueue task", e);
            }
        });
    }

    private void _sendUpdateRequest(String str) {
        MatsEagerCacheServer.CacheRequestDto cacheRequestDto = new MatsEagerCacheServer.CacheRequestDto();
        cacheRequestDto.nodename = this._matsFactory.getFactoryConfig().getNodename();
        cacheRequestDto.sentTimestamp = System.currentTimeMillis();
        cacheRequestDto.sentNanoTime = System.nanoTime();
        cacheRequestDto.command = str;
        try {
            String str2 = str.equals("BOOT") ? "initialCacheUpdateRequest" : "manualCacheUpdateRequest";
            this._matsFactory.getDefaultInitiator().initiate(matsInitiate -> {
                matsInitiate.traceId(TraceId.create(this._matsFactory.getFactoryConfig().getAppName(), "MatsEagerCacheClient-" + this._dataName, str2)).from("MatsEagerCacheClient." + this._dataName + "." + str2).to(MatsEagerCacheServer._getCacheRequestQueue(this._dataName)).send(cacheRequestDto);
            });
        } catch (Exception e) {
            log.error("#MatsEagerCache#S " + "Got exception when initiating the initial cache update request.", e);
            throw new IllegalStateException("Got exception when initiating the initial cache update request.", e);
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:18:0x01ba  */
    /* JADX WARN: Removed duplicated region for block: B:30:0x01ef  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void _handleUpdateInExecutorThread(io.mats3.util.eagercache.MatsEagerCacheServer.BroadcastDto r15, byte[] r16) {
        /*
            Method dump skipped, instructions count: 616
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.mats3.util.eagercache.MatsEagerCacheClient._handleUpdateInExecutorThread(io.mats3.util.eagercache.MatsEagerCacheServer$BroadcastDto, byte[]):void");
    }

    private Stream<?> _getReceiveStreamFromPayload(byte[] bArr) throws IOException {
        return Stream.iterate(this._receivedDataTypeReader.readValues(new InflaterInputStreamWithStats(bArr)), (v0) -> {
            return v0.hasNext();
        }, UnaryOperator.identity()).map((v0) -> {
            return v0.next();
        });
    }
}
