/*
 * Decompiled with CFR 0.152.
 */
package com.bigeek.flink.streaming.connectors.ethereum;

import com.bigeek.flink.utils.EthereumWrapper;
import java.io.IOException;
import java.math.BigInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.methods.response.EthBlock;
import rx.Subscription;

public class EthereumFunctionSource
extends RichSourceFunction<EthBlock> {
    private Logger logger = LoggerFactory.getLogger(EthereumFunctionSource.class);
    private Integer start;
    private String clientAddress;
    private Long timeoutSeconds;
    private transient Subscription subscription;

    public EthereumFunctionSource(String clientAddress, Integer start) {
        this.clientAddress = clientAddress;
        this.start = start;
    }

    public EthereumFunctionSource(String clientAddress, Integer start, Long timeoutSeconds) {
        this.clientAddress = clientAddress;
        this.start = start;
        this.timeoutSeconds = timeoutSeconds;
    }

    public EthereumFunctionSource() {
    }

    public void open(Configuration parameters) throws IOException {
        if (StringUtils.isEmpty((CharSequence)this.clientAddress)) {
            this.clientAddress = parameters.getString("web3j.clientAddress", "http://localhost:8545");
        }
        if (this.timeoutSeconds != null) {
            this.timeoutSeconds = parameters.getLong("web3j.timeout", this.timeoutSeconds.longValue());
        }
        Web3j web3j = EthereumWrapper.configureInstance(this.clientAddress, this.timeoutSeconds);
        if (this.start == null) {
            this.start = ((EthBlock)web3j.ethGetBlockByNumber(DefaultBlockParameter.valueOf((String)"latest"), false).send()).getBlock().getNumber().intValue();
        }
        this.start = parameters.getInteger("web3j.start", this.start.intValue());
    }

    public void close() {
        if (this.subscription != null && !this.subscription.isUnsubscribed()) {
            this.subscription.unsubscribe();
        }
    }

    public void run(SourceFunction.SourceContext<EthBlock> sourceContext) {
        this.logger.info("Generating subscription with start value {}", (Object)this.start);
        this.subscription = EthereumWrapper.getInstance().catchUpToLatestAndSubscribeToNewBlocksObservable(DefaultBlockParameter.valueOf((BigInteger)BigInteger.valueOf(this.start.intValue())), true).subscribe(arg_0 -> sourceContext.collect(arg_0));
    }

    public void cancel() {
        if (this.subscription != null && !this.subscription.isUnsubscribed()) {
            this.subscription.unsubscribe();
        }
    }
}

