package org.fabric3.binding.zeromq.runtime.message;

import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.fabric3.api.annotation.management.Management;
import org.fabric3.api.annotation.management.ManagementOperation;
import org.fabric3.api.annotation.management.OperationType;
import org.fabric3.api.binding.zeromq.model.ZeroMQMetadata;
import org.fabric3.binding.zeromq.runtime.SocketAddress;
import org.fabric3.binding.zeromq.runtime.context.ContextManager;
import org.fabric3.spi.container.channel.EventStreamHandler;
import org.fabric3.spi.discovery.ChannelEntry;
import org.fabric3.spi.discovery.EntryChange;
import org.zeromq.ZMQ;

@Management
/* loaded from: input_file:org/fabric3/binding/zeromq/runtime/message/NonReliableSubscriber.class */
public class NonReliableSubscriber implements Subscriber, BiConsumer<EntryChange, ChannelEntry> {
    private static final byte[] EMPTY_BYTES = new byte[0];
    private String id;
    private ContextManager manager;
    private List<SocketAddress> addresses;
    private EventStreamHandler handler;
    private ZeroMQMetadata metadata;
    private ExecutorService executorService;
    private SocketReceiver receiver;
    private long timeout;
    private String socketId = getClass().getName() + ":" + UUID.randomUUID();
    private AtomicInteger connectionCount = new AtomicInteger();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/fabric3/binding/zeromq/runtime/message/NonReliableSubscriber$SocketReceiver.class */
    public class SocketReceiver implements Runnable {
        private ZMQ.Socket socket;
        private ZMQ.Socket controlSocket;
        private ZMQ.Poller poller;
        private AtomicBoolean active = new AtomicBoolean(true);
        private AtomicBoolean doRefresh = new AtomicBoolean(true);

        SocketReceiver() {
        }

        public void refresh() {
            this.doRefresh.set(true);
        }

        public synchronized void stop() {
            this.active.set(false);
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v41, types: [byte[], java.lang.Object] */
        /* JADX WARN: Type inference failed for: r0v46, types: [byte[]] */
        @Override // java.lang.Runnable
        public void run() {
            while (this.active.get()) {
                try {
                    reconnect();
                    if (this.poller.poll(NonReliableSubscriber.this.timeout) > 0) {
                        if (this.controlSocket.recv(1) != null) {
                            closeSocket();
                            return;
                        }
                        byte[][] bArr = (byte[][]) null;
                        byte[] recv = this.socket.recv(0);
                        int i = 1;
                        while (this.socket.hasReceiveMore()) {
                            if (bArr == null) {
                                bArr = new byte[]{recv};
                            } else {
                                ?? r0 = new byte[bArr.length + 1];
                                System.arraycopy(bArr, 0, r0, 0, bArr.length);
                                bArr = r0;
                            }
                            bArr[i] = this.socket.recv(0);
                            i++;
                        }
                        if (bArr == null) {
                            NonReliableSubscriber.this.handler.handle(recv, true);
                        } else {
                            NonReliableSubscriber.this.handler.handle(bArr, true);
                        }
                    }
                } catch (RuntimeException e) {
                    NonReliableSubscriber.this.executorService.submit(this);
                    throw e;
                }
            }
            closeSocket();
        }

        private synchronized void reconnect() {
            if (this.doRefresh.getAndSet(false)) {
                closeSocket();
                NonReliableSubscriber.this.manager.reserve(NonReliableSubscriber.this.socketId);
                ZMQ.Context context = NonReliableSubscriber.this.manager.getContext();
                this.socket = context.socket(2);
                SocketHelper.configure(this.socket, NonReliableSubscriber.this.metadata);
                this.socket.subscribe(NonReliableSubscriber.EMPTY_BYTES);
                Iterator it = NonReliableSubscriber.this.addresses.iterator();
                while (it.hasNext()) {
                    this.socket.connect(((SocketAddress) it.next()).toProtocolString());
                }
                this.controlSocket = NonReliableSubscriber.this.manager.createControlSocket();
                this.poller = context.poller();
                this.poller.register(this.controlSocket, 1);
                this.poller.register(this.socket, 1);
            }
        }

        private void closeSocket() {
            if (this.socket != null) {
                try {
                    this.socket.close();
                } finally {
                    NonReliableSubscriber.this.manager.release(NonReliableSubscriber.this.socketId);
                }
            }
            if (this.controlSocket != null) {
                this.controlSocket.close();
            }
        }
    }

    public NonReliableSubscriber(String str, ContextManager contextManager, List<SocketAddress> list, EventStreamHandler eventStreamHandler, ZeroMQMetadata zeroMQMetadata, ExecutorService executorService) {
        this.id = str;
        this.manager = contextManager;
        this.addresses = list;
        this.handler = eventStreamHandler;
        this.metadata = zeroMQMetadata;
        this.executorService = executorService;
        long timeout = zeroMQMetadata.getTimeout();
        if (timeout < 0) {
            this.timeout = timeout;
        } else {
            this.timeout = TimeUnit.MILLISECONDS.toMicros(timeout);
        }
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Subscriber
    @ManagementOperation(type = OperationType.POST)
    public void start() {
        if (this.receiver == null) {
            this.receiver = new SocketReceiver();
            this.executorService.submit(this.receiver);
        }
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Subscriber
    @ManagementOperation(type = OperationType.POST)
    public void stop() {
        try {
            this.receiver.stop();
        } finally {
            this.receiver = null;
        }
    }

    @ManagementOperation
    public List<String> getAddresses() {
        return (List) this.addresses.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toList());
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Subscriber
    public void incrementConnectionCount() {
        this.connectionCount.incrementAndGet();
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Subscriber
    public void decrementConnectionCount() {
        this.connectionCount.decrementAndGet();
    }

    @Override // org.fabric3.binding.zeromq.runtime.message.Subscriber
    public boolean hasConnections() {
        return this.connectionCount.get() > 0;
    }

    public String getId() {
        return this.id;
    }

    @Override // java.util.function.BiConsumer
    public void accept(EntryChange entryChange, ChannelEntry channelEntry) {
        this.addresses = AddressUpdater.accept(entryChange, channelEntry, this.addresses);
        if (this.receiver != null) {
            this.receiver.refresh();
        }
    }
}
