package org.opendaylight.controller.cluster.messaging;

import akka.actor.ActorRef;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalNotification;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import org.opendaylight.controller.cluster.io.FileBackedOutputStream;
import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
import org.opendaylight.yangtools.concepts.Identifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opendaylight/controller/cluster/messaging/MessageSlicer.class */
public class MessageSlicer implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(MessageSlicer.class);
    private static final AtomicLong SLICER_ID_COUNTER = new AtomicLong(1);
    public static final int DEFAULT_MAX_SLICING_TRIES = 3;
    private final Cache<MessageSliceIdentifier, SlicedMessageState<ActorRef>> stateCache;
    private final FileBackedOutputStreamFactory fileBackedStreamFactory;
    private final int messageSliceSize;
    private final int maxSlicingTries;
    private final String logContext;
    private final long id = SLICER_ID_COUNTER.getAndIncrement();

    /* loaded from: input_file:org/opendaylight/controller/cluster/messaging/MessageSlicer$Builder.class */
    public static class Builder {
        private FileBackedOutputStreamFactory fileBackedStreamFactory;
        private int messageSliceSize = -1;
        private long expireStateAfterInactivityDuration = -1;
        private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
        private int maxSlicingTries = 3;
        private String logContext = "<no-context>";

        public Builder fileBackedStreamFactory(FileBackedOutputStreamFactory fileBackedOutputStreamFactory) {
            this.fileBackedStreamFactory = (FileBackedOutputStreamFactory) Objects.requireNonNull(fileBackedOutputStreamFactory);
            return this;
        }

        public Builder messageSliceSize(int i) {
            Preconditions.checkArgument(i > 0, "messageSliceSize must be > 0");
            this.messageSliceSize = i;
            return this;
        }

        public Builder maxSlicingTries(int i) {
            Preconditions.checkArgument(i > 0, "newMaxSlicingTries must be > 0");
            this.maxSlicingTries = i;
            return this;
        }

        public Builder expireStateAfterInactivity(long j, TimeUnit timeUnit) {
            Preconditions.checkArgument(j > 0, "duration must be > 0");
            this.expireStateAfterInactivityDuration = j;
            this.expireStateAfterInactivityUnit = timeUnit;
            return this;
        }

        public Builder logContext(String str) {
            this.logContext = (String) Objects.requireNonNull(str);
            return this;
        }

        public MessageSlicer build() {
            return new MessageSlicer(this);
        }
    }

    MessageSlicer(Builder builder) {
        this.fileBackedStreamFactory = builder.fileBackedStreamFactory;
        this.messageSliceSize = builder.messageSliceSize;
        this.maxSlicingTries = builder.maxSlicingTries;
        this.logContext = builder.logContext + "_slicer-id-" + this.id;
        CacheBuilder removalListener = CacheBuilder.newBuilder().removalListener(this::stateRemoved);
        this.stateCache = (builder.expireStateAfterInactivityDuration > 0 ? removalListener.expireAfterAccess(builder.expireStateAfterInactivityDuration, builder.expireStateAfterInactivityUnit) : removalListener).build();
    }

    @VisibleForTesting
    long getId() {
        return this.id;
    }

    public static Builder builder() {
        return new Builder();
    }

    public static boolean isHandledMessage(Object obj) {
        return obj instanceof MessageSliceReply;
    }

    public boolean slice(SliceOptions sliceOptions) {
        FileBackedOutputStream fileBackedStream;
        Identifier identifier = sliceOptions.getIdentifier();
        Serializable message = sliceOptions.getMessage();
        if (message != null) {
            LOG.debug("{}: slice: identifier: {}, message: {}", new Object[]{this.logContext, identifier, message});
            Objects.requireNonNull(this.fileBackedStreamFactory, "The FiledBackedStreamFactory must be set in order to call this slice method");
            fileBackedStream = this.fileBackedStreamFactory.newInstance();
            try {
                ObjectOutputStream objectOutputStream = new ObjectOutputStream(fileBackedStream);
                try {
                    objectOutputStream.writeObject(message);
                    objectOutputStream.close();
                } finally {
                }
            } catch (IOException e) {
                LOG.debug("{}: Error serializing message for {}", new Object[]{this.logContext, identifier, e});
                fileBackedStream.cleanup();
                sliceOptions.getOnFailureCallback().accept(e);
                return false;
            }
        } else {
            fileBackedStream = sliceOptions.getFileBackedStream();
        }
        return initializeSlicing(sliceOptions, fileBackedStream);
    }

    private boolean initializeSlicing(SliceOptions sliceOptions, FileBackedOutputStream fileBackedOutputStream) {
        Identifier identifier = sliceOptions.getIdentifier();
        MessageSliceIdentifier messageSliceIdentifier = new MessageSliceIdentifier(identifier, this.id);
        AutoCloseable autoCloseable = null;
        try {
            SlicedMessageState slicedMessageState = new SlicedMessageState(messageSliceIdentifier, fileBackedOutputStream, this.messageSliceSize, this.maxSlicingTries, sliceOptions.getReplyTo(), sliceOptions.getOnFailureCallback(), this.logContext);
            Serializable message = sliceOptions.getMessage();
            if (slicedMessageState.getTotalSlices() == 1 && message != null) {
                LOG.debug("{}: Message does not need to be sliced - sending original message", this.logContext);
                slicedMessageState.close();
                sendTo(sliceOptions, message, sliceOptions.getReplyTo());
                return false;
            }
            MessageSlice nextSliceMessage = getNextSliceMessage(slicedMessageState);
            LOG.debug("{}: Sending first slice: {}", this.logContext, nextSliceMessage);
            this.stateCache.put(messageSliceIdentifier, slicedMessageState);
            sendTo(sliceOptions, nextSliceMessage, ActorRef.noSender());
            return true;
        } catch (IOException e) {
            LOG.error("{}: Error initializing SlicedMessageState for {}", new Object[]{this.logContext, identifier, e});
            if (0 != 0) {
                autoCloseable.close();
            } else {
                fileBackedOutputStream.cleanup();
            }
            sliceOptions.getOnFailureCallback().accept(e);
            return false;
        }
    }

    private static void sendTo(SliceOptions sliceOptions, Object obj, ActorRef actorRef) {
        if (sliceOptions.getSendToRef() != null) {
            sliceOptions.getSendToRef().tell(obj, actorRef);
        } else {
            sliceOptions.getSendToSelection().tell(obj, actorRef);
        }
    }

    public boolean handleMessage(Object obj) {
        if (!(obj instanceof MessageSliceReply)) {
            return false;
        }
        LOG.debug("{}: handleMessage: {}", this.logContext, obj);
        return onMessageSliceReply((MessageSliceReply) obj);
    }

    public void checkExpiredSlicedMessageState() {
        if (this.stateCache.size() > 0) {
            this.stateCache.cleanUp();
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        LOG.debug("{}: Closing", this.logContext);
        this.stateCache.invalidateAll();
    }

    public void cancelSlicing(Predicate<Identifier> predicate) {
        this.stateCache.asMap().keySet().removeIf(messageSliceIdentifier -> {
            return predicate.test(messageSliceIdentifier.getClientIdentifier());
        });
    }

    private static MessageSlice getNextSliceMessage(SlicedMessageState<ActorRef> slicedMessageState) throws IOException {
        return new MessageSlice(slicedMessageState.getIdentifier(), slicedMessageState.getNextSlice(), slicedMessageState.getCurrentSliceIndex(), slicedMessageState.getTotalSlices(), slicedMessageState.getLastSliceHashCode(), slicedMessageState.getReplyTarget());
    }

    private boolean onMessageSliceReply(MessageSliceReply messageSliceReply) {
        Optional<MessageSliceException> failure;
        Identifier identifier = messageSliceReply.getIdentifier();
        if (!(identifier instanceof MessageSliceIdentifier) || ((MessageSliceIdentifier) identifier).getSlicerId() != this.id) {
            return false;
        }
        SlicedMessageState<ActorRef> slicedMessageState = (SlicedMessageState) this.stateCache.getIfPresent(identifier);
        if (slicedMessageState == null) {
            LOG.warn("{}: SlicedMessageState not found for {}", this.logContext, messageSliceReply);
            messageSliceReply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
            return true;
        }
        synchronized (slicedMessageState) {
            try {
                failure = messageSliceReply.getFailure();
            } catch (IOException e) {
                LOG.warn("{}: Error processing {}", new Object[]{this.logContext, messageSliceReply, e});
                fail(slicedMessageState, e);
            }
            if (failure.isPresent()) {
                LOG.warn("{}: Received failed {}", this.logContext, messageSliceReply);
                processMessageSliceException(failure.get(), slicedMessageState, messageSliceReply.getSendTo());
                return true;
            }
            if (slicedMessageState.getCurrentSliceIndex() != messageSliceReply.getSliceIndex()) {
                LOG.warn("{}: Slice index {} in {} does not match expected index {}", new Object[]{this.logContext, Integer.valueOf(messageSliceReply.getSliceIndex()), messageSliceReply, Integer.valueOf(slicedMessageState.getCurrentSliceIndex())});
                messageSliceReply.getSendTo().tell(new AbortSlicing(identifier), ActorRef.noSender());
                possiblyRetrySlicing(slicedMessageState, messageSliceReply.getSendTo());
                return true;
            }
            if (slicedMessageState.isLastSlice(messageSliceReply.getSliceIndex())) {
                LOG.debug("{}: Received last slice reply for {}", this.logContext, identifier);
                removeState(identifier);
            } else {
                MessageSlice nextSliceMessage = getNextSliceMessage(slicedMessageState);
                LOG.debug("{}: Sending next slice: {}", this.logContext, nextSliceMessage);
                messageSliceReply.getSendTo().tell(nextSliceMessage, ActorRef.noSender());
            }
            return true;
        }
    }

    private void processMessageSliceException(MessageSliceException messageSliceException, SlicedMessageState<ActorRef> slicedMessageState, ActorRef actorRef) throws IOException {
        if (messageSliceException.isRetriable()) {
            possiblyRetrySlicing(slicedMessageState, actorRef);
        } else {
            fail(slicedMessageState, messageSliceException.getCause() != null ? messageSliceException.getCause() : messageSliceException);
        }
    }

    private void possiblyRetrySlicing(SlicedMessageState<ActorRef> slicedMessageState, ActorRef actorRef) throws IOException {
        if (slicedMessageState.canRetry()) {
            LOG.info("{}: Retrying message slicing for {}", this.logContext, slicedMessageState.getIdentifier());
            slicedMessageState.reset();
            actorRef.tell(getNextSliceMessage(slicedMessageState), ActorRef.noSender());
        } else {
            String format = String.format("Maximum slicing retries reached for identifier %s - failing the message", slicedMessageState.getIdentifier());
            LOG.warn(format);
            fail(slicedMessageState, new RuntimeException(format));
        }
    }

    private void removeState(Identifier identifier) {
        LOG.debug("{}: Removing state for {}", this.logContext, identifier);
        this.stateCache.invalidate(identifier);
    }

    private void stateRemoved(RemovalNotification<Identifier, SlicedMessageState<ActorRef>> removalNotification) {
        SlicedMessageState slicedMessageState = (SlicedMessageState) removalNotification.getValue();
        slicedMessageState.close();
        if (!removalNotification.wasEvicted()) {
            LOG.debug("{}: SlicedMessageState for {} was removed from the cache due to {}", new Object[]{this.logContext, removalNotification.getKey(), removalNotification.getCause()});
        } else {
            LOG.warn("{}: SlicedMessageState for {} was expired from the cache", this.logContext, removalNotification.getKey());
            slicedMessageState.getOnFailureCallback().accept(new RuntimeException(String.format("The slicing state for message identifier %s was expired due to inactivity from the assembling component on the other end", slicedMessageState.getIdentifier())));
        }
    }

    private void fail(SlicedMessageState<ActorRef> slicedMessageState, Throwable th) {
        removeState(slicedMessageState.getIdentifier());
        slicedMessageState.getOnFailureCallback().accept(th);
    }

    @VisibleForTesting
    boolean hasState(Identifier identifier) {
        boolean z = this.stateCache.getIfPresent(identifier) != null;
        this.stateCache.cleanUp();
        return z;
    }
}
