package io.sapl.interpreter.pip;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.sapl.api.interpreter.Val;
import io.sapl.api.pip.Attribute;
import io.sapl.api.pip.PolicyInformationPoint;
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.web3j.abi.FunctionReturnDecoder;
import org.web3j.abi.datatypes.Function;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.methods.response.Transaction;
import org.web3j.protocol.exceptions.ClientConnectionException;
import org.web3j.protocol.http.HttpService;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@PolicyInformationPoint(name = "ethereum", description = "Connects to the Ethereum Blockchain.")
/* loaded from: input_file:io/sapl/interpreter/pip/EthereumPolicyInformationPoint.class */
public class EthereumPolicyInformationPoint {
    private static final String ETH_PIP_CONFIG = "ethPipConfig";
    private static final long DEFAULT_ETH_POLLING_INTERVAL = 5000;
    private static final String ADDRESS = "address";
    private static final String CONTRACT_ADDRESS = "contractAddress";
    private static final String TRANSACTION_HASH = "transactionHash";
    private static final String FROM_ACCOUNT = "fromAccount";
    private static final String TO_ACCOUNT = "toAccount";
    private static final String TRANSACTION_VALUE = "transactionValue";
    private static final String INPUT_PARAMS = "inputParams";
    private static final String OUTPUT_PARAMS = "outputParams";
    private static final String FUNCTION_NAME = "functionName";
    private static final String POSITION = "position";
    private static final String BLOCK_HASH = "blockHash";
    private static final String SHA3_HASH_OF_DATA_TO_SIGN = "sha3HashOfDataToSign";
    private static final String TRANSACTION = "transaction";
    private static final String RETURN_FULL_TRANSACTION_OBJECTS = "returnFullTransactionObjects";
    private static final String TRANSACTION_INDEX = "transactionIndex";
    private static final String UNCLE_INDEX = "uncleIndex";
    private static final String FILTER_ID = "filterId";
    private static final String DEFAULT_BLOCK_PARAMETER = "defaultBlockParameter";
    private static final String VERIFY_TRANSACTION_WARNING = "There was an error during verifyTransaction. By default false is returned but the transaction could have taken place.";
    private static final String ETH_POLLING_INTERVAL = "ethPollingInterval";
    private final Web3j web3j;

    @Generated
    private static final Logger log = LoggerFactory.getLogger(EthereumPolicyInformationPoint.class);
    private static final ObjectMapper MAPPER = new ObjectMapper();

    public EthereumPolicyInformationPoint() {
        this(Web3j.build(new HttpService()));
    }

    public EthereumPolicyInformationPoint(Web3j web3j) {
        this.web3j = web3j;
    }

    @Attribute(name = TRANSACTION, docs = "Returns true, if a transaction has taken place and false otherwise.")
    public Flux<Val> verifyTransaction(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withVerifiedTransaction(val), map);
    }

    private Callable<Val> withVerifiedTransaction(Val val) {
        return () -> {
            try {
                JsonNode jsonNode = val.get();
                this.web3j.ethAccounts().flowable();
                Optional transaction = this.web3j.ethGetTransactionByHash(EthereumBasicFunctions.getStringFrom(jsonNode, TRANSACTION_HASH)).send().getTransaction();
                if (transaction.isPresent()) {
                    Transaction transaction2 = (Transaction) transaction.get();
                    if (transaction2.getFrom().equalsIgnoreCase(EthereumBasicFunctions.getStringFrom(jsonNode, FROM_ACCOUNT)) && transaction2.getTo().equalsIgnoreCase(EthereumBasicFunctions.getStringFrom(jsonNode, TO_ACCOUNT)) && transaction2.getValue().equals(EthereumBasicFunctions.getBigIntFrom(jsonNode, TRANSACTION_VALUE))) {
                        return Val.TRUE;
                    }
                }
            } catch (IOException | NullPointerException | ClientConnectionException e) {
                log.warn(VERIFY_TRANSACTION_WARNING);
            }
            return Val.FALSE;
        };
    }

    @Attribute(name = "contract", docs = "Returns the result of a function call of a specified contract.")
    public Flux<Val> loadContractInformation(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withInformationFromContract(val), map);
    }

    private Callable<Val> withInformationFromContract(Val val) {
        JsonNode jsonNode = val.get();
        return () -> {
            String stringFrom = EthereumBasicFunctions.getStringFrom(jsonNode, FROM_ACCOUNT);
            String stringFrom2 = EthereumBasicFunctions.getStringFrom(jsonNode, CONTRACT_ADDRESS);
            String stringFrom3 = EthereumBasicFunctions.getStringFrom(jsonNode, FUNCTION_NAME);
            JsonNode jsonFrom = EthereumBasicFunctions.getJsonFrom(jsonNode, INPUT_PARAMS);
            JsonNode jsonFrom2 = EthereumBasicFunctions.getJsonFrom(jsonNode, OUTPUT_PARAMS);
            JsonNode jsonFrom3 = EthereumBasicFunctions.getJsonFrom(jsonNode, DEFAULT_BLOCK_PARAMETER);
            Function createFunction = EthereumPipFunctions.createFunction(stringFrom3, jsonFrom, jsonFrom2);
            return EthereumBasicFunctions.toVal(FunctionReturnDecoder.decode(this.web3j.ethCall(org.web3j.protocol.core.methods.request.Transaction.createEthCallTransaction(stringFrom, stringFrom2, EthereumPipFunctions.createEncodedFunction(createFunction)), EthereumPipFunctions.getDefaultBlockParameter(jsonFrom3)).send().getValue(), createFunction.getOutputParameters()));
        };
    }

    @Attribute(name = "clientVersion", docs = "Returns the current client version.")
    public Flux<Val> web3ClientVersion(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withWeb3ClientVersion(), map);
    }

    private Callable<Val> withWeb3ClientVersion() {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.web3ClientVersion().send().getWeb3ClientVersion());
        };
    }

    @Attribute(name = "sha3", docs = "Returns Keccak-256 (not the standardized SHA3-256) of the given data.")
    public Flux<Val> web3Sha3(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withWeb3Sha3(val), map);
    }

    private Callable<Val> withWeb3Sha3(Val val) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.web3Sha3(val.get().textValue()).send().getResult());
        };
    }

    @Attribute(name = "netVersion", docs = "Returns the current network id.")
    public Flux<Val> netVersion(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withNetVersion(), map);
    }

    private Callable<Val> withNetVersion() {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.netVersion().send().getNetVersion());
        };
    }

    @Attribute(name = "listening", docs = "Returns true if client is actively listening for network connections.")
    public Flux<Val> netListening(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withNetListening(), map);
    }

    private Callable<Val> withNetListening() {
        return () -> {
            return EthereumBasicFunctions.toVal(Boolean.valueOf(this.web3j.netListening().send().isListening()));
        };
    }

    @Attribute(name = "peerCount", docs = "Returns number of peers currently connected to the client.")
    public Flux<Val> netPeerCount(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withNetPeerCount(), map);
    }

    private Callable<Val> withNetPeerCount() {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.netPeerCount().send().getQuantity());
        };
    }

    @Attribute(name = "protocolVersion", docs = "Returns the current ethereum protocol version.")
    public Flux<Val> ethProtocolVersion(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withEthProtocolVersion(), map);
    }

    private Callable<Val> withEthProtocolVersion() {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethProtocolVersion().send().getProtocolVersion());
        };
    }

    @Attribute(name = "syncing", docs = "Returns true if the client is syncing or false otherwise.")
    public Flux<Val> ethSyncing(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withEthSyncing(), map);
    }

    private Callable<Val> withEthSyncing() {
        return () -> {
            return EthereumBasicFunctions.toVal(Boolean.valueOf(this.web3j.ethSyncing().send().isSyncing()));
        };
    }

    @Attribute(name = "coinbase", docs = "Returns the client coinbase address.")
    public Flux<Val> ethCoinbase(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withEthCoinbase(), map);
    }

    private Callable<Val> withEthCoinbase() {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethCoinbase().send().getResult());
        };
    }

    @Attribute(name = "mining", docs = "Returns true if client is actively mining new blocks.")
    public Flux<Val> ethMining(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withEthMining(), map);
    }

    private Callable<Val> withEthMining() {
        return () -> {
            return EthereumBasicFunctions.toVal(Boolean.valueOf(this.web3j.ethMining().send().isMining()));
        };
    }

    @Attribute(name = "hashrate", docs = "Returns the number of hashes per second that the node is mining with.")
    public Flux<Val> ethHashrate(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withEthHashrate(), map);
    }

    private Callable<Val> withEthHashrate() {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethHashrate().send().getHashrate());
        };
    }

    @Attribute(name = "gasPrice", docs = "Returns the current price per gas in wei.")
    public Flux<Val> ethGasPrice(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withEthGasPrice(), map);
    }

    private Callable<Val> withEthGasPrice() {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGasPrice().send().getGasPrice());
        };
    }

    @Attribute(name = "accounts", docs = "Returns a list of addresses owned by client.")
    public Flux<Val> ethAccounts(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withEthAccounts(), map);
    }

    private Callable<Val> withEthAccounts() {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethAccounts().send().getAccounts());
        };
    }

    @Attribute(name = "blockNumber", docs = "Returns the number of most recent block.")
    public Flux<Val> ethBlockNumber(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withEthBlockNumber(), map);
    }

    private Callable<Val> withEthBlockNumber() {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethBlockNumber().send().getBlockNumber());
        };
    }

    @Attribute(name = "balance", docs = "Returns the balance of the account of given address.")
    public Flux<Val> ethGetBalance(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withAccountBalance(val), map);
    }

    private Callable<Val> withAccountBalance(Val val) {
        JsonNode jsonNode = val.get();
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetBalance(EthereumBasicFunctions.getStringFrom(jsonNode, ADDRESS), EthereumPipFunctions.getDefaultBlockParameter(jsonNode)).send().getBalance());
        };
    }

    @Attribute(name = "storage", docs = "Returns the value from a storage position at a given address.")
    public Flux<Val> ethGetStorageAt(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withStorageAt(val), map);
    }

    private Callable<Val> withStorageAt(Val val) {
        JsonNode jsonNode = val.get();
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetStorageAt(EthereumBasicFunctions.getStringFrom(jsonNode, ADDRESS), jsonNode.get(POSITION).bigIntegerValue(), EthereumPipFunctions.getDefaultBlockParameter(jsonNode)).send().getData());
        };
    }

    @Attribute(name = "transactionCount", docs = "Returns the number of transactions sent from an address.")
    public Flux<Val> ethGetTransactionCount(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withTransactionCount(val), map);
    }

    private Callable<Val> withTransactionCount(Val val) {
        JsonNode jsonNode = val.get();
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetTransactionCount(EthereumBasicFunctions.getStringFrom(jsonNode, ADDRESS), EthereumPipFunctions.getDefaultBlockParameter(jsonNode)).send().getTransactionCount());
        };
    }

    @Attribute(name = "blockTransactionCountByHash", docs = "Returns the number of transactions in a block from a block matching the given block hash.")
    public Flux<Val> ethGetBlockTransactionCountByHash(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withBlockTransactionCountByHash(val), map);
    }

    private Callable<Val> withBlockTransactionCountByHash(Val val) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetBlockTransactionCountByHash(EthereumBasicFunctions.getStringFrom(val.get(), BLOCK_HASH)).send().getTransactionCount());
        };
    }

    @Attribute(name = "blockTransactionCountByNumber", docs = "Returns the number of transactions in a block matching the given block number.")
    public Flux<Val> ethGetBlockTransactionCountByNumber(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withBlockTransactionCountByNumber(val), map);
    }

    private Callable<Val> withBlockTransactionCountByNumber(Val val) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetBlockTransactionCountByNumber(EthereumPipFunctions.getDefaultBlockParameter(val.get())).send().getTransactionCount());
        };
    }

    @Attribute(name = "uncleCountByBlockHash", docs = "Returns the number of uncles in a block from a block matching the given block hash.")
    public Flux<Val> ethGetUncleCountByBlockHash(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withUncleCountByBlockHash(val), map);
    }

    private Callable<Val> withUncleCountByBlockHash(Val val) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetUncleCountByBlockHash(EthereumBasicFunctions.getStringFrom(val.get(), BLOCK_HASH)).send().getUncleCount());
        };
    }

    @Attribute(name = "uncleCountByBlockNumber", docs = "Returns the number of uncles in a block from a block matching the given block number.")
    public Flux<Val> ethGetUncleCountByBlockNumber(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withUncleCountByBlockNumber(val), map);
    }

    private Callable<Val> withUncleCountByBlockNumber(Val val) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetUncleCountByBlockNumber(EthereumPipFunctions.getDefaultBlockParameter(val.get())).send().getUncleCount());
        };
    }

    @Attribute(name = "code", docs = "Returns code at a given address.")
    public Flux<Val> ethGetCode(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withCode(val), map);
    }

    private Callable<Val> withCode(Val val) {
        JsonNode jsonNode = val.get();
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetCode(EthereumBasicFunctions.getStringFrom(jsonNode, ADDRESS), EthereumPipFunctions.getDefaultBlockParameter(jsonNode)).send().getCode());
        };
    }

    @Attribute(name = "sign", docs = "The sign method calculates an Ethereum specific signature.")
    public Flux<Val> ethSign(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withSignature(val), map);
    }

    private Callable<Val> withSignature(Val val) {
        JsonNode jsonNode = val.get();
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethSign(EthereumBasicFunctions.getStringFrom(jsonNode, ADDRESS), EthereumBasicFunctions.getStringFrom(jsonNode, SHA3_HASH_OF_DATA_TO_SIGN)).send().getSignature());
        };
    }

    @Attribute(name = "call", docs = "Executes a new message call immediately without creating a transaction on the block chain.")
    public Flux<Val> ethCall(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withCallResult(val), map);
    }

    private Callable<Val> withCallResult(Val val) {
        JsonNode jsonNode = val.get();
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethCall(EthereumPipFunctions.getTransactionFromJson(jsonNode.get(TRANSACTION)), EthereumPipFunctions.getDefaultBlockParameter(jsonNode)).send().getValue());
        };
    }

    @Attribute(name = "estimateGas", docs = "Generates and returns an estimate of how much gas is necessary to allow the transaction to complete.")
    public Flux<Val> ethEstimateGas(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withEstimatedGas(val), map);
    }

    private Callable<Val> withEstimatedGas(Val val) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethEstimateGas(EthereumPipFunctions.getTransactionFromJson(val.get().get(TRANSACTION))).send().getAmountUsed());
        };
    }

    @Attribute(name = "blockByHash", docs = "Returns information about a block by hash.")
    public Flux<Val> ethGetBlockByHash(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withBlockByHash(val.get()), map);
    }

    private Callable<Val> withBlockByHash(JsonNode jsonNode) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetBlockByHash(EthereumBasicFunctions.getStringFrom(jsonNode, BLOCK_HASH), EthereumBasicFunctions.getBooleanFrom(jsonNode, RETURN_FULL_TRANSACTION_OBJECTS)).send().getBlock());
        };
    }

    @Attribute(name = "blockByNumber", docs = "Returns information about a block by block number.")
    public Flux<Val> ethGetBlockByNumber(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withBlockByNumber(val.get()), map);
    }

    private Callable<Val> withBlockByNumber(JsonNode jsonNode) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetBlockByNumber(EthereumPipFunctions.getDefaultBlockParameter(jsonNode), EthereumBasicFunctions.getBooleanFrom(jsonNode, RETURN_FULL_TRANSACTION_OBJECTS)).send().getBlock());
        };
    }

    @Attribute(name = "transactionByHash", docs = "Returns the information about a transaction requested by transaction hash.")
    public Flux<Val> ethGetTransactionByHash(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withTransactionByHash(val), map);
    }

    private Callable<Val> withTransactionByHash(Val val) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetTransactionByHash(val.get().textValue()).send().getResult());
        };
    }

    @Attribute(name = "transactionByBlockHashAndIndex", docs = "Returns information about a transaction by block hash and transaction index position.")
    public Flux<Val> ethGetTransactionByBlockHashAndIndex(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withTransactionByBlockHashAndIndex(val.get()), map);
    }

    private Callable<Val> withTransactionByBlockHashAndIndex(JsonNode jsonNode) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetTransactionByBlockHashAndIndex(EthereumBasicFunctions.getStringFrom(jsonNode, BLOCK_HASH), EthereumBasicFunctions.getBigIntFrom(jsonNode, TRANSACTION_INDEX)).send().getResult());
        };
    }

    @Attribute(name = "transactionByBlockNumberAndIndex", docs = "Returns information about a transaction by block number and transaction index position.")
    public Flux<Val> ethGetTransactionByBlockNumberAndIndex(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withTransactionByBlockNumberAndIndex(val.get()), map);
    }

    private Callable<Val> withTransactionByBlockNumberAndIndex(JsonNode jsonNode) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetTransactionByBlockNumberAndIndex(EthereumPipFunctions.getDefaultBlockParameter(jsonNode), EthereumBasicFunctions.getBigIntFrom(jsonNode, TRANSACTION_INDEX)).send().getResult());
        };
    }

    @Attribute(name = "transactionReceipt", docs = "Returns the receipt of a transaction by transaction hash.")
    public Flux<Val> ethGetTransactionReceipt(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withTransactionReceipt(val.get()), map);
    }

    private Callable<Val> withTransactionReceipt(JsonNode jsonNode) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetTransactionReceipt(jsonNode.textValue()).send().getResult());
        };
    }

    @Attribute(name = "pendingTransactions", docs = "Returns the pending transactions list.")
    public Flux<Val> ethPendingTransactions(Val val, Map<String, JsonNode> map) {
        return Flux.from(this.web3j.ethPendingTransactionHashFlowable().map(str -> {
            return (JsonNode) MAPPER.convertValue(str, JsonNode.class);
        }).map(Val::of));
    }

    @Attribute(name = "uncleByBlockHashAndIndex", docs = "Returns information about a uncle of a block by hash and uncle index position.")
    public Flux<Val> ethGetUncleByBlockHashAndIndex(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withUncleByBlockHashAndIndex(val.get()), map);
    }

    private Callable<Val> withUncleByBlockHashAndIndex(JsonNode jsonNode) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetUncleByBlockHashAndIndex(EthereumBasicFunctions.getStringFrom(jsonNode, BLOCK_HASH), EthereumBasicFunctions.getBigIntFrom(jsonNode, UNCLE_INDEX)).send().getBlock());
        };
    }

    @Attribute(name = "uncleByBlockNumberAndIndex", docs = "Returns information about a uncle of a block by number and uncle index position.")
    public Flux<Val> ethGetUncleByBlockNumberAndIndex(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withUncleByBlockNumberAndIndex(val.get()), map);
    }

    private Callable<Val> withUncleByBlockNumberAndIndex(JsonNode jsonNode) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetUncleByBlockNumberAndIndex(EthereumPipFunctions.getDefaultBlockParameter(jsonNode), EthereumBasicFunctions.getBigIntFrom(jsonNode, UNCLE_INDEX)).send().getBlock());
        };
    }

    @Attribute(name = "ethFilterChanges", docs = "Returns an array of logs which occurred since last poll.")
    public Flux<Val> ethGetFilterChanges(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withFilterChanges(val.get()), map);
    }

    private Callable<Val> withFilterChanges(JsonNode jsonNode) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetFilterChanges(EthereumBasicFunctions.getBigIntFrom(jsonNode, FILTER_ID)).send().getLogs());
        };
    }

    @Attribute(name = "ethFilterLogs", docs = "Returns an array of all logs matching filter with given id.")
    public Flux<Val> ethGetFilterLogs(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withFilterLogs(val), map);
    }

    private Callable<Val> withFilterLogs(Val val) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetFilterLogs(EthereumBasicFunctions.getBigIntFrom(val.get(), FILTER_ID)).send().getLogs());
        };
    }

    @Attribute(name = "logs", docs = "Returns an array of all logs matching a given filter object.")
    public Flux<Val> ethGetLogs(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withLogs(val), map);
    }

    private Callable<Val> withLogs(Val val) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetLogs(EthereumPipFunctions.getEthFilterFrom(val.get())).send().getLogs());
        };
    }

    @Attribute(name = "work", docs = "Returns the hash of the current block, the seedHash, and the boundary condition to be met (\"target\").")
    public Flux<Val> ethGetWork(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withWork(), map);
    }

    private Callable<Val> withWork() {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.ethGetWork().send().getResult());
        };
    }

    @Attribute(name = "shhVersion", docs = "Returns the current whisper protocol version.")
    public Flux<Val> shhVersion(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withShhVersion(), map);
    }

    private Callable<Val> withShhVersion() {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.shhVersion().send().getVersion());
        };
    }

    @Attribute(name = "hasIdentity", docs = "Checks if the client holds the private keys for a given identity.")
    public Flux<Val> shhHasIdentity(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withHasIdentity(val), map);
    }

    private Callable<Val> withHasIdentity(Val val) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.shhHasIdentity(val.get().textValue()).send().getResult());
        };
    }

    @Attribute(name = "shhFilterChanges", docs = "Polling method for whisper filters. Returns new messages since the last call of this method.")
    public Flux<Val> shhGetFilterChanges(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withShhFilterChanges(val), map);
    }

    private Callable<Val> withShhFilterChanges(Val val) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.shhGetFilterChanges(val.get().bigIntegerValue()).send().getMessages());
        };
    }

    @Attribute(name = "messages", docs = "Get all messages matching a filter. Unlike shhFilterChanges this returns all messages.")
    public Flux<Val> shhGetMessages(Val val, Map<String, JsonNode> map) {
        return scheduledFlux(withShhMessages(val), map);
    }

    private Callable<Val> withShhMessages(Val val) {
        return () -> {
            return EthereumBasicFunctions.toVal(this.web3j.shhGetMessages(val.get().bigIntegerValue()).send().getMessages());
        };
    }

    private Flux<Val> scheduledFlux(Callable<Val> callable, Map<String, JsonNode> map) {
        return Flux.interval(Duration.ZERO, getPollingInterval(map)).flatMap(l -> {
            return Mono.fromCallable(callable);
        }).distinctUntilChanged().onErrorReturn(Val.NULL);
    }

    private static Duration getPollingInterval(Map<String, JsonNode> map) {
        JsonNode jsonNode;
        return (map == null || map.get(ETH_PIP_CONFIG) == null || (jsonNode = map.get(ETH_POLLING_INTERVAL)) == null || !jsonNode.isLong()) ? Duration.ofMillis(DEFAULT_ETH_POLLING_INTERVAL) : Duration.ofMillis(jsonNode.asLong(DEFAULT_ETH_POLLING_INTERVAL));
    }
}
