package org.opendaylight.controller.cluster.messaging;

import akka.actor.ActorRef;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalNotification;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.opendaylight.controller.cluster.io.FileBackedOutputStreamFactory;
import org.opendaylight.yangtools.concepts.Identifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opendaylight/controller/cluster/messaging/MessageAssembler.class */
public final class MessageAssembler implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(MessageAssembler.class);
    private final Cache<Identifier, AssembledMessageState> stateCache;
    private final FileBackedOutputStreamFactory fileBackedStreamFactory;
    private final BiConsumer<Object, ActorRef> assembledMessageCallback;
    private final String logContext;

    /* loaded from: input_file:org/opendaylight/controller/cluster/messaging/MessageAssembler$Builder.class */
    public static class Builder {
        private FileBackedOutputStreamFactory fileBackedStreamFactory;
        private BiConsumer<Object, ActorRef> assembledMessageCallback;
        private long expireStateAfterInactivityDuration = 1;
        private TimeUnit expireStateAfterInactivityUnit = TimeUnit.MINUTES;
        private String logContext = "<no-context>";

        public Builder fileBackedStreamFactory(FileBackedOutputStreamFactory fileBackedOutputStreamFactory) {
            this.fileBackedStreamFactory = (FileBackedOutputStreamFactory) Objects.requireNonNull(fileBackedOutputStreamFactory);
            return this;
        }

        public Builder assembledMessageCallback(BiConsumer<Object, ActorRef> biConsumer) {
            this.assembledMessageCallback = biConsumer;
            return this;
        }

        public Builder expireStateAfterInactivity(long j, TimeUnit timeUnit) {
            Preconditions.checkArgument(j > 0, "duration must be > 0");
            this.expireStateAfterInactivityDuration = j;
            this.expireStateAfterInactivityUnit = timeUnit;
            return this;
        }

        public Builder logContext(String str) {
            this.logContext = str;
            return this;
        }

        public MessageAssembler build() {
            return new MessageAssembler(this);
        }
    }

    MessageAssembler(Builder builder) {
        this.fileBackedStreamFactory = (FileBackedOutputStreamFactory) Objects.requireNonNull(builder.fileBackedStreamFactory, "FiledBackedStreamFactory cannot be null");
        this.assembledMessageCallback = (BiConsumer) Objects.requireNonNull(builder.assembledMessageCallback, "assembledMessageCallback cannot be null");
        this.logContext = builder.logContext;
        this.stateCache = CacheBuilder.newBuilder().expireAfterAccess(builder.expireStateAfterInactivityDuration, builder.expireStateAfterInactivityUnit).removalListener(this::stateRemoved).build();
    }

    public static Builder builder() {
        return new Builder();
    }

    public static boolean isHandledMessage(Object obj) {
        return (obj instanceof MessageSlice) || (obj instanceof AbortSlicing);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        LOG.debug("{}: Closing", this.logContext);
        this.stateCache.invalidateAll();
    }

    public void checkExpiredAssembledMessageState() {
        if (this.stateCache.size() > 0) {
            this.stateCache.cleanUp();
        }
    }

    public boolean handleMessage(Object obj, ActorRef actorRef) {
        if (obj instanceof MessageSlice) {
            MessageSlice messageSlice = (MessageSlice) obj;
            LOG.debug("{}: handleMessage: {}", this.logContext, messageSlice);
            onMessageSlice(messageSlice, actorRef);
            return true;
        }
        if (!(obj instanceof AbortSlicing)) {
            return false;
        }
        AbortSlicing abortSlicing = (AbortSlicing) obj;
        LOG.debug("{}: handleMessage: {}", this.logContext, abortSlicing);
        onAbortSlicing(abortSlicing);
        return true;
    }

    private void onMessageSlice(MessageSlice messageSlice, ActorRef actorRef) {
        Identifier identifier = messageSlice.getIdentifier();
        try {
            processMessageSliceForState(messageSlice, (AssembledMessageState) this.stateCache.get(identifier, () -> {
                return createState(messageSlice);
            }), actorRef);
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            messageSlice.getReplyTo().tell(MessageSliceReply.failed(identifier, cause instanceof MessageSliceException ? (MessageSliceException) cause : new MessageSliceException(String.format("Error creating state for identifier %s", identifier), cause), actorRef), ActorRef.noSender());
        }
    }

    private AssembledMessageState createState(MessageSlice messageSlice) throws MessageSliceException {
        Identifier identifier = messageSlice.getIdentifier();
        if (messageSlice.getSliceIndex() == 1) {
            LOG.debug("{}: Received first slice for {} - creating AssembledMessageState", this.logContext, identifier);
            return new AssembledMessageState(identifier, messageSlice.getTotalSlices(), this.fileBackedStreamFactory, this.logContext);
        }
        LOG.debug("{}: AssembledMessageState not found for {} - returning failed reply", this.logContext, identifier);
        throw new MessageSliceException(String.format("No assembled state found for identifier %s and slice index %s", identifier, Integer.valueOf(messageSlice.getSliceIndex())), true);
    }

    private void processMessageSliceForState(MessageSlice messageSlice, AssembledMessageState assembledMessageState, ActorRef actorRef) {
        Identifier identifier = messageSlice.getIdentifier();
        ActorRef replyTo = messageSlice.getReplyTo();
        Object obj = null;
        synchronized (assembledMessageState) {
            int sliceIndex = messageSlice.getSliceIndex();
            try {
                MessageSliceReply success = MessageSliceReply.success(identifier, sliceIndex, actorRef);
                if (assembledMessageState.addSlice(sliceIndex, messageSlice.getData(), messageSlice.getLastSliceHashCode())) {
                    LOG.debug("{}: Received last slice for {}", this.logContext, identifier);
                    obj = reAssembleMessage(assembledMessageState);
                    replyTo.tell(success, ActorRef.noSender());
                    removeState(identifier);
                } else {
                    LOG.debug("{}: Added slice for {} - expecting more", this.logContext, identifier);
                    replyTo.tell(success, ActorRef.noSender());
                }
            } catch (MessageSliceException e) {
                LOG.warn("{}: Error processing {}", new Object[]{this.logContext, messageSlice, e});
                replyTo.tell(MessageSliceReply.failed(identifier, e, actorRef), ActorRef.noSender());
                removeState(identifier);
            }
        }
        if (obj != null) {
            LOG.debug("{}: Notifying callback of re-assembled message {}", this.logContext, obj);
            this.assembledMessageCallback.accept(obj, replyTo);
        }
    }

    private static Object reAssembleMessage(AssembledMessageState assembledMessageState) throws MessageSliceException {
        try {
            ObjectInputStream objectInputStream = new ObjectInputStream(assembledMessageState.getAssembledBytes().openStream());
            try {
                Object readObject = objectInputStream.readObject();
                objectInputStream.close();
                return readObject;
            } finally {
            }
        } catch (IOException | ClassNotFoundException e) {
            throw new MessageSliceException(String.format("Error re-assembling bytes for identifier %s", assembledMessageState.getIdentifier()), e);
        }
    }

    private void onAbortSlicing(AbortSlicing abortSlicing) {
        removeState(abortSlicing.getIdentifier());
    }

    private void removeState(Identifier identifier) {
        LOG.debug("{}: Removing state for {}", this.logContext, identifier);
        this.stateCache.invalidate(identifier);
    }

    private void stateRemoved(RemovalNotification<Identifier, AssembledMessageState> removalNotification) {
        if (removalNotification.wasEvicted()) {
            LOG.warn("{}: AssembledMessageState for {} was expired from the cache", this.logContext, removalNotification.getKey());
        } else {
            LOG.debug("{}: AssembledMessageState for {} was removed from the cache due to {}", new Object[]{this.logContext, removalNotification.getKey(), removalNotification.getCause()});
        }
        ((AssembledMessageState) removalNotification.getValue()).close();
    }

    @VisibleForTesting
    boolean hasState(Identifier identifier) {
        boolean z = this.stateCache.getIfPresent(identifier) != null;
        this.stateCache.cleanUp();
        return z;
    }
}
