/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.mantis.remote.observable;

import io.mantisrx.common.network.WritableEndpoint;
import io.reactivex.mantis.remote.observable.MutableReference;
import io.reactivex.mantis.remote.observable.RemoteRxEvent;
import io.reactivex.mantis.remote.observable.slotting.SlottingStrategy;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import mantis.io.reactivex.netty.channel.ObservableConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;

public class SafeWriter {
    static final Logger logger = LoggerFactory.getLogger(SafeWriter.class);
    private static final AtomicLong checkIsOpenCounter = new AtomicLong();
    private static final int CHECK_IS_OPEN_INTERVAL = 1000;

    <T> boolean safeWrite(ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection, List<RemoteRxEvent> events, MutableReference<Subscription> subReference, Action0 onSuccessfulWriteCallback, Action1<Throwable> onFailedWriteCallback, SlottingStrategy<T> slottingStrategyReference, WritableEndpoint<T> endpoint) {
        boolean writeSuccess = true;
        if (checkIsOpenCounter.getAndIncrement() % 1000L == 0L) {
            if (!connection.isCloseIssued() && connection.getChannel().isActive()) {
                writeSuccess = this.checkWriteableAndWrite(connection, events, onSuccessfulWriteCallback, onFailedWriteCallback);
            } else {
                writeSuccess = false;
                logger.warn("Detected closed or inactive client connection, force unsubscribe.");
                subReference.getValue().unsubscribe();
                if (slottingStrategyReference != null) {
                    logger.info("Removing slot for endpoint: " + endpoint);
                    if (!slottingStrategyReference.removeConnection(endpoint)) {
                        logger.error("Failed to remove endpoint from slot,  endpoint: " + endpoint);
                    }
                }
            }
        } else {
            writeSuccess = this.checkWriteableAndWrite(connection, events, onSuccessfulWriteCallback, onFailedWriteCallback);
        }
        return writeSuccess;
    }

    private boolean checkWriteableAndWrite(ObservableConnection<RemoteRxEvent, List<RemoteRxEvent>> connection, List<RemoteRxEvent> events, Action0 onSuccessfulWriteCallback, Action1<Throwable> onFailedWriteCallback) {
        boolean writeSuccess = true;
        if (connection.getChannel().isWritable()) {
            connection.writeAndFlush(events).doOnError(onFailedWriteCallback).doOnCompleted(onSuccessfulWriteCallback).subscribe();
        } else {
            writeSuccess = false;
        }
        return writeSuccess;
    }
}

