/*
 * Decompiled with CFR 0.152.
 */
package org.rx.io;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import org.rx.bean.$;
import org.rx.bean.DataTable;
import org.rx.bean.RandomList;
import org.rx.bean.Tuple;
import org.rx.core.Extends;
import org.rx.core.Linq;
import org.rx.core.Strings;
import org.rx.exception.TraceHandler;
import org.rx.io.EntityDatabase;
import org.rx.io.EntityDatabaseImpl;
import org.rx.io.EntityQueryLambda;
import org.rx.net.Sockets;
import org.rx.net.nameserver.NameserverClient;
import org.rx.net.rpc.Remoting;
import org.rx.net.rpc.RpcClientConfig;
import org.rx.util.function.BiAction;
import org.rx.util.function.BiFunc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShardingEntityDatabase
implements EntityDatabase {
    private static final Logger log = LoggerFactory.getLogger(ShardingEntityDatabase.class);
    public static final int DEFAULT_PORT = 3305;
    static final String APP_NAME = "EDB";
    final EntityDatabaseImpl local;
    final int rpcPort;
    final NameserverClient nsClient = new NameserverClient("EDB");
    final RandomList<Tuple<InetSocketAddress, EntityDatabase>> nodes = new RandomList();
    boolean enableAsync = true;
    boolean dynamicNodes = true;

    public ShardingEntityDatabase(String ... registerEndpoints) {
        this(3305, registerEndpoints);
    }

    public ShardingEntityDatabase(int rpcPort, String ... registerEndpoints) {
        this("./rx", null, 0, rpcPort, registerEndpoints);
    }

    public ShardingEntityDatabase(String filePath, String timeRollingPattern, int maxConnections, int rpcPort, String ... registerEndpoints) {
        this.nodes.setSortFunc(p -> ((InetSocketAddress)p.left).getHostString());
        this.local = new EntityDatabaseImpl(filePath, timeRollingPattern, maxConnections);
        this.rpcPort = rpcPort;
        Remoting.register(this.local, rpcPort, false);
        ((CompletableFuture)this.nsClient.registerAsync(registerEndpoints).whenComplete((r, e) -> {
            if (e != null) {
                return;
            }
            this.nodes.add(Tuple.of(new InetSocketAddress(Sockets.getLoopbackAddress(), rpcPort), this.local));
            this.nodes.addAll((Collection<Tuple<InetSocketAddress, EntityDatabase>>)Linq.from(this.nsClient.discoverAll(APP_NAME, true)).select(p -> {
                InetSocketAddress ep = new InetSocketAddress((InetAddress)p, rpcPort);
                return Tuple.of(ep, Remoting.createFacade(EntityDatabase.class, RpcClientConfig.poolMode(ep, 2, this.local.maxConnections)));
            }).toList());
            log.info("{} init {} sharding nodes", (Object)APP_NAME, (Object)this.nodes.size());
            try {
                this.nsClient.waitInject();
            }
            catch (TimeoutException ex) {
                TraceHandler.INSTANCE.log(ex);
            }
        })).join();
        this.nsClient.onAppAddressChanged.combine((s, e) -> {
            if (!Strings.hashEquals(APP_NAME, e.getAppName())) {
                return;
            }
            InetSocketAddress ep = new InetSocketAddress(e.getAddress(), rpcPort);
            log.info("{} address registered: {} -> {} isUp={}", new Object[]{APP_NAME, Linq.from(this.nodes).toJoinString(",", p -> ((InetSocketAddress)p.left).toString()), ep, e.isUp()});
            RandomList<Tuple<InetSocketAddress, EntityDatabase>> randomList = this.nodes;
            synchronized (randomList) {
                if (e.isUp()) {
                    if (!Linq.from(this.nodes).any(p -> ((InetSocketAddress)p.left).equals(ep))) {
                        this.nodes.add(Tuple.of(ep, Remoting.createFacade(EntityDatabase.class, RpcClientConfig.poolMode(ep, 2, this.local.maxConnections))));
                    }
                } else {
                    this.nodes.removeIf((Predicate<Tuple<InetSocketAddress, EntityDatabase>>)((Predicate<Tuple>)p -> ((InetSocketAddress)p.left).equals(ep)));
                }
            }
        });
    }

    @Override
    public synchronized void close() {
        this.nsClient.close();
        this.nodes.clear();
        this.local.close();
    }

    @Override
    public <T> void save(T entity) {
        EntityDatabaseImpl.SqlMeta meta = this.local.getMeta(entity.getClass());
        Object id = ((Field)meta.primaryKey.getValue().left).get(entity);
        this.invokeSharding(p -> {
            p.save(entity);
            return null;
        }, id);
    }

    @Override
    public <T> void save(T entity, boolean doInsert) {
        EntityDatabaseImpl.SqlMeta meta = this.local.getMeta(entity.getClass());
        Object id = ((Field)meta.primaryKey.getValue().left).get(entity);
        this.invokeSharding(p -> {
            p.save(entity, doInsert);
            return null;
        }, id);
    }

    @Override
    public <T> boolean deleteById(Class<T> entityType, Serializable id) {
        if (this.dynamicNodes) {
            AtomicBoolean rf = new AtomicBoolean();
            this.invokeAll(p -> {
                if (p.deleteById(entityType, id)) {
                    rf.set(true);
                    Extends.circuitContinue(false);
                }
            });
            return rf.get();
        }
        return this.invokeSharding(p -> p.deleteById(entityType, id), id);
    }

    @Override
    public <T> long delete(EntityQueryLambda<T> query) {
        AtomicLong rf = new AtomicLong();
        this.invokeAll(p -> rf.addAndGet(p.delete(query)));
        return rf.get();
    }

    @Override
    public <T> long count(EntityQueryLambda<T> query) {
        AtomicLong rf = new AtomicLong();
        this.invokeAll(p -> rf.addAndGet(p.count(query)));
        return rf.get();
    }

    @Override
    public <T> boolean exists(EntityQueryLambda<T> query) {
        AtomicBoolean rf = new AtomicBoolean();
        this.invokeAll(p -> {
            if (p.exists(query)) {
                rf.set(true);
                Extends.circuitContinue(false);
            }
        });
        return rf.get();
    }

    @Override
    public <T> boolean existsById(Class<T> entityType, Serializable id) {
        if (this.dynamicNodes) {
            AtomicBoolean rf = new AtomicBoolean();
            this.invokeAll(p -> {
                if (p.existsById(entityType, id)) {
                    rf.set(true);
                    Extends.circuitContinue(false);
                }
            });
            return rf.get();
        }
        return this.invokeSharding(p -> p.existsById(entityType, id), id);
    }

    @Override
    public <T> T findById(Class<T> entityType, Serializable id) {
        if (this.dynamicNodes) {
            $ rf = $.$();
            this.invokeAll(p -> {
                rf.v = p.findById(entityType, id);
                if (rf.v != null) {
                    Extends.circuitContinue(false);
                }
            });
            return rf.v;
        }
        return (T)this.invokeSharding(p -> p.findById(entityType, id), id);
    }

    @Override
    public <T> T findOne(EntityQueryLambda<T> query) {
        $ rf = $.$();
        this.invokeAll(p -> {
            rf.v = p.findOne(query);
            if (rf.v != null) {
                Extends.circuitContinue(false);
            }
        });
        return rf.v;
    }

    @Override
    public <T> List<T> findBy(EntityQueryLambda<T> query) {
        AbstractList rf = this.enableAsync ? new Vector(this.nodes.size()) : new ArrayList(this.nodes.size());
        this.invokeAll(p -> rf.addAll(p.findBy(query)));
        return EntityQueryLambda.sharding(rf, query);
    }

    @Override
    public void compact() {
        this.invokeAll(EntityDatabase::compact);
    }

    @Override
    public <T> void dropMapping(Class<T> entityType) {
        this.invokeAll(p -> p.dropMapping(entityType));
    }

    @Override
    public void createMapping(Class<?> ... entityTypes) {
        this.invokeAll(p -> p.createMapping(entityTypes));
    }

    @Override
    public String tableName(Class<?> entityType) {
        return this.local.tableName(entityType);
    }

    @Override
    public <T> DataTable executeQuery(String sql, Class<T> entityType) {
        if (Strings.startsWithIgnoreCase((CharSequence)sql, (CharSequence)"EXPLAIN")) {
            return this.local.executeQuery(sql, entityType);
        }
        Vector<DataTable> rf = this.enableAsync ? new Vector(this.nodes.size()) : new ArrayList(this.nodes.size());
        this.invokeAll(p -> rf.add(p.executeQuery(sql, entityType)));
        return EntityDatabaseImpl.sharding(rf, sql);
    }

    @Override
    public int executeUpdate(String sql) {
        AtomicInteger rf = new AtomicInteger();
        this.invokeAll(p -> rf.addAndGet(p.executeUpdate(sql)));
        return rf.get();
    }

    @Override
    public boolean isInTransaction() {
        return this.local.isInTransaction();
    }

    @Override
    public void begin() {
    }

    @Override
    public void begin(int transactionIsolation) {
    }

    @Override
    public void commit() {
    }

    @Override
    public void rollback() {
    }

    <T> T invokeSharding(BiFunc<EntityDatabase, T> fn, Object shardingKey) {
        int len = this.nodes.size();
        int i = Math.abs(shardingKey.hashCode()) % len;
        EntityDatabase db = (EntityDatabase)this.nodes.get((int)i).right;
        return fn.invoke(db);
    }

    void invokeAll(BiAction<EntityDatabase> fn) {
        if (this.enableAsync) {
            Linq.from(this.nodes, true).forEach(tuple -> fn.accept((EntityDatabase)tuple.right));
            return;
        }
        Extends.eachQuietly(this.nodes, tuple -> fn.invoke((EntityDatabase)tuple.right));
    }

    public void setEnableAsync(boolean enableAsync) {
        this.enableAsync = enableAsync;
    }

    public void setDynamicNodes(boolean dynamicNodes) {
        this.dynamicNodes = dynamicNodes;
    }
}

