package ars.module.system.service;

import ars.database.repository.Repositories;
import ars.database.repository.Repository;
import ars.database.service.StandardGeneralService;
import ars.invoke.channel.http.Https;
import ars.invoke.event.InvokeAfterEvent;
import ars.invoke.event.InvokeBeforeEvent;
import ars.invoke.event.InvokeCompleteEvent;
import ars.invoke.event.InvokeErrorEvent;
import ars.invoke.event.InvokeEvent;
import ars.invoke.event.InvokeListener;
import ars.invoke.remote.Protocol;
import ars.invoke.remote.Remotes;
import ars.invoke.request.Requester;
import ars.module.system.model.Message;
import ars.module.system.model.Subscribe;
import ars.server.Servers;
import ars.server.timer.AbstractTimerServer;
import ars.util.Beans;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.http.client.methods.HttpUriRequest;

/* loaded from: input_file:ars/module/system/service/AbstractSubscribeService.class */
public abstract class AbstractSubscribeService<T extends Subscribe> extends StandardGeneralService<T> implements SubscribeService<T>, InvokeListener<InvokeEvent> {
    private int batch = 1000;
    private Map<String, T> subscribes;

    public AbstractSubscribeService() {
        initRefreshServer();
    }

    public int getBatch() {
        return this.batch;
    }

    public void setBatch(int i) {
        if (i < 1) {
            throw new IllegalArgumentException("Illegal batch:" + i);
        }
        this.batch = i;
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [ars.module.system.service.AbstractSubscribeService$1] */
    protected void initRefreshServer() {
        new AbstractTimerServer() { // from class: ars.module.system.service.AbstractSubscribeService.1
            protected void execute() throws Exception {
                AbstractSubscribeService.this.refresh();
            }
        }.start();
    }

    protected void initSubscribeCache() {
        if (this.subscribes == null) {
            List<Subscribe> list = getRepository().query().list();
            this.subscribes = new HashMap(list.size());
            for (Subscribe subscribe : list) {
                this.subscribes.put(subscribe.getTarget(), subscribe);
            }
        }
    }

    protected T getSubscribe(Requester requester) {
        if (this.subscribes == null) {
            synchronized (this) {
                initSubscribeCache();
            }
        }
        return this.subscribes.get(requester.getUri());
    }

    protected void push(Requester requester, Subscribe subscribe) throws Exception {
        Protocol protocol = subscribe.getProtocol();
        String host = subscribe.getHost();
        Integer port = subscribe.getPort();
        String resource = subscribe.getResource();
        if (protocol == Protocol.http || protocol == Protocol.https) {
            HttpUriRequest httpUriRequest = Https.getHttpUriRequest(Https.getUrl(protocol, host, port.intValue(), resource).toString(), Https.Method.POST, requester.getParameters());
            httpUriRequest.addHeader("token", requester.getToken().getCode());
            httpUriRequest.addHeader("client", Remotes.getClient());
            Https.getBytes(httpUriRequest);
            return;
        }
        if (protocol != Protocol.ssl && protocol != Protocol.tcp && protocol != Protocol.udp) {
            throw new RuntimeException("Not support protocol:" + protocol);
        }
        Remotes.invoke(Remotes.getProxy(protocol, host, port.intValue()), requester.getToken(), resource, requester.getParameters());
    }

    protected void synchron(Requester requester, Subscribe subscribe) {
        try {
            push(requester, subscribe);
        } catch (Exception e) {
            Servers.logger.error("Message synchronization failure", e);
            Repository repository = Repositories.getRepository(Message.class);
            Message message = (Message) Beans.getInstance(repository.getModel());
            message.setSubscribe(subscribe);
            message.setRequester(requester);
            repository.save(message);
        }
    }

    protected void refresh() {
        Repository repository = Repositories.getRepository(Message.class);
        int ceil = (int) Math.ceil(repository.query().count() / this.batch);
        for (int i = 1; i <= ceil; i++) {
            for (final Message message : repository.query().paging(i, this.batch).asc(new String[]{"dateJoined"}).list()) {
                try {
                    Servers.submit(new Callable<Object>() { // from class: ars.module.system.service.AbstractSubscribeService.2
                        @Override // java.util.concurrent.Callable
                        public Object call() throws Exception {
                            AbstractSubscribeService.this.push(message.getRequester(), message.getSubscribe());
                            return null;
                        }
                    }).get();
                    repository.delete(message);
                } catch (Exception e) {
                    Servers.logger.error("Message synchronization failure", e);
                    message.setResend(Integer.valueOf(message.getResend().intValue() + 1));
                    repository.update(message);
                }
            }
        }
    }

    public Serializable saveObject(Requester requester, T t) {
        Serializable saveObject = super.saveObject(requester, t);
        synchronized (this) {
            initSubscribeCache();
            this.subscribes.put(t.getTarget(), t);
        }
        return saveObject;
    }

    public void updateObject(Requester requester, T t) {
        super.updateObject(requester, t);
        synchronized (this) {
            initSubscribeCache();
            this.subscribes.put(t.getTarget(), t);
        }
    }

    public void deleteObject(Requester requester, T t) {
        super.deleteObject(requester, t);
        synchronized (this) {
            initSubscribeCache();
            this.subscribes.remove(t.getTarget());
        }
    }

    public void onInvokeEvent(InvokeEvent invokeEvent) {
        final Requester source = invokeEvent.getSource();
        final T subscribe = getSubscribe(source);
        if (subscribe != null) {
            if ((subscribe.getEvent() == Subscribe.Event.BEFORE && (invokeEvent instanceof InvokeBeforeEvent)) || ((subscribe.getEvent() == Subscribe.Event.AFTER && (invokeEvent instanceof InvokeAfterEvent)) || ((subscribe.getEvent() == Subscribe.Event.ERROR && (invokeEvent instanceof InvokeErrorEvent)) || (subscribe.getEvent() == Subscribe.Event.COMPLETE && (invokeEvent instanceof InvokeCompleteEvent))))) {
                Servers.execute(new Runnable() { // from class: ars.module.system.service.AbstractSubscribeService.3
                    @Override // java.lang.Runnable
                    public void run() {
                        AbstractSubscribeService.this.synchron(source, subscribe);
                    }
                });
            }
        }
    }
}
