/*
 * Decompiled with CFR 0.152.
 */
package org.tentackle.io;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.tentackle.common.Service;
import org.tentackle.common.TentackleRuntimeException;
import org.tentackle.io.ReconnectionPolicy;
import org.tentackle.io.ReconnectorHolder;
import org.tentackle.log.Logger;

@Service(value=Reconnector.class)
public class Reconnector {
    private static final Logger LOGGER = Logger.get(Reconnector.class);
    private static final int LOG_MODULO = 1000;
    private final AtomicInteger threadNumber = new AtomicInteger();
    private final ExecutorService executorService = this.createExecutorService();

    public static Reconnector getInstance() {
        return ReconnectorHolder.INSTANCE;
    }

    public <T> void submit(ReconnectionPolicy<T> policy) {
        if (policy.isBlocking()) {
            this.reconnect(policy);
        } else {
            this.executorService.submit(() -> () -> this.reconnect(policy));
        }
    }

    protected <T> void reconnect(ReconnectionPolicy<T> policy) {
        int count = 0;
        while (true) {
            long millis;
            if ((millis = policy.timeToReconnect()) > 0L) {
                try {
                    Thread.sleep(millis);
                    policy.getConsumer().accept(policy.getConnector().get());
                    break;
                }
                catch (Throwable t) {
                    if (count % 1000 == 0) {
                        LOGGER.severe("reconnection for " + policy + ", attempt " + (count == 0 ? 1 : count) + " failed, keep on trying...", t);
                    }
                }
            } else {
                throw new TentackleRuntimeException("millis=" + millis + " -> reconnection for " + policy + " aborted");
            }
            ++count;
        }
    }

    protected ExecutorService createExecutorService() {
        return Executors.newCachedThreadPool(runnable -> new Thread(runnable, "reconnector " + this.threadNumber.incrementAndGet()));
    }
}

