package io.atomix.copycat.client.session;

import ch.qos.logback.core.CoreConstants;
import io.atomix.catalyst.concurrent.Listener;
import io.atomix.catalyst.concurrent.ThreadContext;
import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.util.Assert;
import io.atomix.copycat.protocol.PublishRequest;
import io.atomix.copycat.protocol.ResetRequest;
import io.atomix.copycat.session.Event;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.Consumer;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/copycat-client-1.2.8.jar:io/atomix/copycat/client/session/ClientSessionListener.class */
public final class ClientSessionListener {
    private final Connection connection;
    private final ClientSessionState state;
    private final ThreadContext context;
    private final Map<String, Set<Consumer>> eventListeners = new ConcurrentHashMap();
    private final ClientSequencer sequencer;

    public ClientSessionListener(Connection connection, ClientSessionState clientSessionState, ClientSequencer clientSequencer, ThreadContext threadContext) {
        this.connection = (Connection) Assert.notNull(connection, "connection");
        this.state = (ClientSessionState) Assert.notNull(clientSessionState, "state");
        this.context = (ThreadContext) Assert.notNull(threadContext, CoreConstants.CONTEXT_SCOPE_VALUE);
        this.sequencer = (ClientSequencer) Assert.notNull(clientSequencer, "sequencer");
        connection.handler(PublishRequest.class, this::handlePublish);
    }

    public Listener<Void> onEvent(String str, Runnable runnable) {
        return onEvent(str, obj -> {
            runnable.run();
        });
    }

    public <T> Listener<T> onEvent(String str, final Consumer consumer) {
        final Set<Consumer> computeIfAbsent = this.eventListeners.computeIfAbsent(str, str2 -> {
            return new CopyOnWriteArraySet();
        });
        computeIfAbsent.add(consumer);
        return new Listener<T>() { // from class: io.atomix.copycat.client.session.ClientSessionListener.1
            @Override // java.util.function.Consumer
            public void accept(T t) {
                consumer.accept(t);
            }

            @Override // io.atomix.catalyst.concurrent.Listener, java.lang.AutoCloseable
            public void close() {
                computeIfAbsent.remove(consumer);
            }
        };
    }

    private void handlePublish(PublishRequest publishRequest) {
        this.state.getLogger().trace("{} - Received {}", Long.valueOf(this.state.getSessionId()), publishRequest);
        if (publishRequest.session() != this.state.getSessionId()) {
            this.state.getLogger().trace("{} - Inconsistent session ID: {}", Long.valueOf(this.state.getSessionId()), Long.valueOf(publishRequest.session()));
            return;
        }
        if (publishRequest.eventIndex() <= this.state.getEventIndex()) {
            return;
        }
        if (publishRequest.previousIndex() != this.state.getEventIndex()) {
            this.state.getLogger().trace("{} - Inconsistent event index: {}", Long.valueOf(this.state.getSessionId()), Long.valueOf(publishRequest.previousIndex()));
            this.connection.send(ResetRequest.builder().withSession(this.state.getSessionId()).withIndex(this.state.getEventIndex()).build());
        } else {
            this.state.setEventIndex(publishRequest.eventIndex());
            this.sequencer.sequenceEvent(publishRequest, () -> {
                for (Event<?> event : publishRequest.events()) {
                    Set<Consumer> set = this.eventListeners.get(event.name());
                    if (set != null) {
                        Iterator<Consumer> it = set.iterator();
                        while (it.hasNext()) {
                            it.next().accept(event.message());
                        }
                    }
                }
            });
        }
    }

    public CompletableFuture<Void> close() {
        return CompletableFuture.completedFuture(null);
    }
}
