package io.getmedusa.medusa.core.router.action;

import io.getmedusa.diffengine.model.ServerSideDiff;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import reactor.core.publisher.Flux;

/* loaded from: input_file:io/getmedusa/medusa/core/router/action/SocketSink.class */
public class SocketSink {
    private final EventProcessor eventProcessor = new EventProcessor(this) { // from class: io.getmedusa.medusa.core.router.action.SocketSink.1
        private EventListener<Set<ServerSideDiff>> eventListener;
        private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

        @Override // io.getmedusa.medusa.core.router.action.SocketSink.EventProcessor
        public void register(EventListener<Set<ServerSideDiff>> eventListener) {
            this.eventListener = eventListener;
        }

        @Override // io.getmedusa.medusa.core.router.action.SocketSink.EventProcessor
        public void dataChunk(Set<ServerSideDiff> set) {
            this.executor.schedule(() -> {
                this.eventListener.onDataChunk(set);
            }, 0L, TimeUnit.MILLISECONDS);
        }

        @Override // io.getmedusa.medusa.core.router.action.SocketSink.EventProcessor
        public void processComplete() {
            ScheduledExecutorService scheduledExecutorService = this.executor;
            EventListener<Set<ServerSideDiff>> eventListener = this.eventListener;
            Objects.requireNonNull(eventListener);
            scheduledExecutorService.schedule(eventListener::processComplete, 500L, TimeUnit.MILLISECONDS);
        }
    };
    Flux<Set<ServerSideDiff>> eventFlux = Flux.create(fluxSink -> {
        this.eventProcessor.register(new EventListener<Set<ServerSideDiff>>(this) { // from class: io.getmedusa.medusa.core.router.action.SocketSink.2
            @Override // io.getmedusa.medusa.core.router.action.SocketSink.EventListener
            public void onDataChunk(Set<ServerSideDiff> set) {
                fluxSink.next(set);
            }

            @Override // io.getmedusa.medusa.core.router.action.SocketSink.EventListener
            public void processComplete() {
                fluxSink.complete();
            }
        });
    });

    /* loaded from: input_file:io/getmedusa/medusa/core/router/action/SocketSink$EventListener.class */
    interface EventListener<T> {
        void onDataChunk(T t);

        void processComplete();
    }

    /* loaded from: input_file:io/getmedusa/medusa/core/router/action/SocketSink$EventProcessor.class */
    interface EventProcessor {
        void register(EventListener<Set<ServerSideDiff>> eventListener);

        void dataChunk(Set<ServerSideDiff> set);

        void processComplete();
    }

    public void push(Set<ServerSideDiff> set) {
        this.eventProcessor.dataChunk(set);
    }

    public Flux<Set<ServerSideDiff>> asFlux() {
        return this.eventFlux;
    }

    public SocketSink() {
        this.eventProcessor.dataChunk(new LinkedHashSet());
    }
}
