package com.bigeek.flink.streaming.connectors.ethereum;

import com.bigeek.flink.utils.EthereumWrapper;
import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.methods.response.EthBlock;

/* loaded from: input_file:com/bigeek/flink/streaming/connectors/ethereum/EthereumFunctionSource.class */
public class EthereumFunctionSource extends RichSourceFunction<EthBlock> {
    private Integer start;
    private String clientAddress;
    private Long timeoutSeconds;
    private volatile Disposable disposable;
    private Logger logger = LoggerFactory.getLogger(EthereumFunctionSource.class);
    private transient Object waitLock = new Object();

    public EthereumFunctionSource(String str) {
        this.clientAddress = str;
    }

    public EthereumFunctionSource(String str, Integer num) {
        this.clientAddress = str;
        this.start = num;
    }

    public EthereumFunctionSource(String str, Integer num, Long l) {
        this.clientAddress = str;
        this.start = num;
        this.timeoutSeconds = l;
    }

    public EthereumFunctionSource() {
    }

    public void open(Configuration configuration) throws IOException {
        if (StringUtils.isEmpty(this.clientAddress)) {
            this.clientAddress = configuration.getString("web3j.clientAddress", "http://localhost:8545");
        }
        if (this.timeoutSeconds != null) {
            this.timeoutSeconds = Long.valueOf(configuration.getLong("web3j.timeout", this.timeoutSeconds.longValue()));
        }
        Web3j configureInstance = EthereumWrapper.configureInstance(this.clientAddress, this.timeoutSeconds);
        if (this.start == null) {
            this.start = Integer.valueOf(configureInstance.ethGetBlockByNumber(DefaultBlockParameter.valueOf("latest"), false).send().getBlock().getNumber().intValue());
        }
        this.start = Integer.valueOf(configuration.getInteger("web3j.start", this.start.intValue()));
    }

    public void close() {
        if (this.disposable == null || this.disposable.isDisposed()) {
            return;
        }
        this.disposable.dispose();
    }

    public void run(SourceFunction.SourceContext<EthBlock> sourceContext) throws InterruptedException {
        this.waitLock = new Object();
        this.logger.info("Generating subscription with start value {}", this.start);
        Flowable replayPastAndFutureBlocksFlowable = EthereumWrapper.getInstance().replayPastAndFutureBlocksFlowable(DefaultBlockParameter.valueOf(BigInteger.valueOf(this.start.intValue())), true);
        if (Objects.nonNull(this.timeoutSeconds)) {
            replayPastAndFutureBlocksFlowable = replayPastAndFutureBlocksFlowable.timeout(this.timeoutSeconds.longValue(), TimeUnit.SECONDS);
        }
        sourceContext.getClass();
        this.disposable = replayPastAndFutureBlocksFlowable.subscribe((v1) -> {
            r2.collect(v1);
        });
        while (!this.disposable.isDisposed()) {
            synchronized (this.waitLock) {
                this.waitLock.wait(100L);
            }
        }
    }

    public void cancel() {
        if (this.disposable == null || this.disposable.isDisposed()) {
            return;
        }
        this.disposable.dispose();
    }
}
