package pl.grzeslowski.jsupla.server.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.grzeslowski.jsupla.protocol.api.calltypes.CallTypeParser;
import pl.grzeslowski.jsupla.protocol.api.decoders.DecoderFactory;
import pl.grzeslowski.jsupla.protocol.api.encoders.EncoderFactory;
import pl.grzeslowski.jsupla.protocol.api.structs.SuplaDataPacket;
import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:pl/grzeslowski/jsupla/server/netty/SuplaHandler.class */
final class SuplaHandler extends SimpleChannelInboundHandler<SuplaDataPacket> implements Publisher<SuplaDataPacket>, AutoCloseable {
    private final Collection<FluxSink<NettyChannel>> rootEmitters;
    private final CallTypeParser callTypeParser;
    private final DecoderFactory decoderFactory;
    private final EncoderFactory encoderFactory;
    private final Flux<SuplaDataPacket> flux;
    private final Disposable disposable;
    private final Collection<FluxSink<SuplaDataPacket>> emitters = Collections.synchronizedList(new LinkedList());
    private final Logger logger = LoggerFactory.getLogger(SuplaHandler.class.getName() + "#" + hashCode());

    /* JADX INFO: Access modifiers changed from: package-private */
    public SuplaHandler(Collection<FluxSink<NettyChannel>> collection, CallTypeParser callTypeParser, DecoderFactory decoderFactory, EncoderFactory encoderFactory) {
        this.logger.debug("New instance");
        this.rootEmitters = collection;
        this.callTypeParser = callTypeParser;
        this.decoderFactory = decoderFactory;
        this.encoderFactory = encoderFactory;
        ConnectableFlux publish = Flux.create(fluxSink -> {
            this.emitters.add(fluxSink);
            fluxSink.onDispose(() -> {
                this.emitters.remove(fluxSink);
            });
        }).publish();
        this.disposable = publish.connect();
        this.flux = publish;
    }

    public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.logger.debug("SuplaHandler.handlerRemoved(ctx)");
        super.handlerRemoved(channelHandlerContext);
    }

    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelRegistered(channelHandlerContext);
        this.rootEmitters.forEach(fluxSink -> {
            fluxSink.next(newNettyChannel(channelHandlerContext, this.flux));
        });
    }

    private NettyChannel newNettyChannel(ChannelHandlerContext channelHandlerContext, Flux<SuplaDataPacket> flux) {
        return new NettyChannel(channelHandlerContext, flux, this.callTypeParser, this.decoderFactory, this.encoderFactory);
    }

    public void subscribe(Subscriber<? super SuplaDataPacket> subscriber) {
        this.flux.subscribe(subscriber);
    }

    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.logger.debug("SuplaHandler.channelUnregistered(ctx), emitters.size={}", Integer.valueOf(this.emitters.size()));
        super.channelUnregistered(channelHandlerContext);
        this.emitters.forEach((v0) -> {
            v0.complete();
        });
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        this.logger.debug("SuplaHandler.exceptionCaught(ctx, {})", th);
        super.exceptionCaught(channelHandlerContext, th);
        this.emitters.forEach(fluxSink -> {
            fluxSink.error(th);
        });
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.logger.debug("SuplaHandler.channelInactive(ctx), emitters.size={}", Integer.valueOf(this.emitters.size()));
        super.channelInactive(channelHandlerContext);
        this.emitters.forEach((v0) -> {
            v0.complete();
        });
    }

    public void channelRead0(ChannelHandlerContext channelHandlerContext, SuplaDataPacket suplaDataPacket) throws Exception {
        this.logger.trace("SuplaHandler.channelRead0(ctx, {})", suplaDataPacket);
        this.emitters.forEach(fluxSink -> {
            fluxSink.next(suplaDataPacket);
        });
        channelHandlerContext.flush();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.logger.debug("Closing SuplaHandler");
        this.emitters.clear();
        this.disposable.dispose();
    }
}
