package cn.benma666.sjzt.kafka;

import cn.benma666.constants.UtilConstInstance;
import cn.benma666.domain.SysSjglSjzt;
import cn.benma666.domain.SysSjglZnjh;
import cn.benma666.exception.MyException;
import cn.benma666.iframe.InterfaceLog;
import cn.benma666.iframe.Result;
import cn.benma666.myutils.ClassUtil;
import cn.benma666.myutils.DateUtil;
import cn.benma666.myutils.FileUtil;
import cn.benma666.sjzt.BasicSjzt;
import cn.benma666.sjzt.IFile;
import cn.benma666.sjzt.SjztBzcwjdxExecption;
import cn.benma666.sjzt.SjztExecRunnable;
import cn.benma666.sjzt.SjztPooledObjectFactory;
import cn.benma666.sjzt.bdwj.BdwjFile;
import com.alibaba.druid.util.Utils;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;

/* loaded from: input_file:cn/benma666/sjzt/kafka/Kafka.class */
public class Kafka extends BasicSjzt {
    private static Kafka kafka = null;
    private final GenericObjectPool objectPool;

    protected Kafka(String str, SysSjglSjzt sysSjglSjzt) {
        super(str, sysSjglSjzt);
        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
        genericObjectPoolConfig.setTestOnCreate(true);
        genericObjectPoolConfig.setTestOnBorrow(true);
        genericObjectPoolConfig.setMinIdle(1);
        genericObjectPoolConfig.setMaxIdle(2);
        genericObjectPoolConfig.setMaxTotal(5);
        genericObjectPoolConfig.setMaxWait(Duration.ofSeconds(300L));
        ClassUtil.plMethodInvoke(genericObjectPoolConfig, getSjzt().getKzxxObj().getJSONObject("ljcpz"));
        this.objectPool = new GenericObjectPool(new SjztPooledObjectFactory(sysSjglSjzt), genericObjectPoolConfig);
        KafkaClient kafkaClient = null;
        try {
            try {
                kafkaClient = borrowClient();
                returnClient(kafkaClient);
                if (kafka == null) {
                    kafka = this;
                }
                cache.put(str, this);
            } catch (Exception e) {
                throw new MyException(str + "初始化失败", (Throwable) e);
            }
        } catch (Throwable th) {
            returnClient(kafkaClient);
            throw th;
        }
    }

    public void returnClient(KafkaClient kafkaClient) {
        this.log.trace("释放回连接池：{}", this.name);
        if (kafkaClient != null) {
            this.objectPool.returnObject(kafkaClient);
        }
    }

    public KafkaClient borrowClient() throws Exception {
        this.log.trace("从连接池获取：{}", this.name);
        return (KafkaClient) this.objectPool.borrowObject();
    }

    public static Kafka use(String str) {
        return use(str, getSjzt(str));
    }

    public static Kafka use(String str, SysSjglSjzt sysSjglSjzt) {
        Kafka kafka2 = (Kafka) cache.get(str);
        if (kafka2 == null) {
            kafka2 = new Kafka(str, sysSjglSjzt);
        }
        return kafka2;
    }

    public static Result cszt(SysSjglSjzt sysSjglSjzt) {
        try {
            KafkaClient kafkaClient = new KafkaClient(sysSjglSjzt);
            kafkaClient.getAdminClient().listTopics();
            kafkaClient.close();
            return success("测试成功");
        } catch (Throwable th) {
            slog.debug("{}测试失败", sysSjglSjzt, th);
            return failed("载体测试不通过：" + th.getMessage());
        }
    }

    public static boolean validateClient(SysSjglSjzt sysSjglSjzt, Object obj) {
        try {
            ListTopicsOptions listTopicsOptions = new ListTopicsOptions();
            listTopicsOptions.timeoutMs(5000);
            ((KafkaClient) obj).getAdminClient().listTopics(listTopicsOptions);
            return true;
        } catch (Throwable th) {
            slog.debug("ftp验证无效：{}", sysSjglSjzt.getMc(), th);
            return false;
        }
    }

    public static void destroyClient(SysSjglSjzt sysSjglSjzt, Object obj) throws Exception {
        ((KafkaClient) obj).close();
    }

    public static KafkaClient createClient(SysSjglSjzt sysSjglSjzt) {
        return new KafkaClient(sysSjglSjzt);
    }

    public Object exec(SjztExecRunnable<KafkaClient> sjztExecRunnable) {
        KafkaClient kafkaClient = null;
        try {
            try {
                kafkaClient = borrowClient();
                Object exec = sjztExecRunnable.exec(kafkaClient);
                if (kafkaClient != null) {
                    this.objectPool.returnObject(kafkaClient);
                }
                return exec;
            } catch (Exception e) {
                throw new MyException("kafka执行异常", (Throwable) e);
            }
        } catch (Throwable th) {
            if (kafkaClient != null) {
                this.objectPool.returnObject(kafkaClient);
            }
            throw th;
        }
    }

    @Override // cn.benma666.sjzt.BasicSjzt
    public List<IFile> listFiles(SysSjglZnjh sysSjglZnjh) {
        return (List) exec(kafkaClient -> {
            try {
                KafkaConsumer kafkaConsumer = kafkaClient.getKafkaConsumer();
                ArrayList arrayList = new ArrayList();
                kafkaConsumer.subscribe(Collections.singleton(sysSjglZnjh.getSrml().replace(UtilConstInstance.FXG, UtilConstInstance.NULL_STR)));
                for (int i = 0; arrayList.size() == 0 && i <= 10; i++) {
                    Iterator it = kafkaConsumer.poll(Duration.ofMillis(100L)).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        KafkaFile kafkaFile = new KafkaFile(UtilConstInstance.FXG, consumerRecord, this);
                        kafkaFile.setGzml(sysSjglZnjh.getSrml().replace(UtilConstInstance.FXG, UtilConstInstance.NULL_STR));
                        arrayList.add(kafkaFile);
                        this.log.debug("offset = {}, value = {}", Long.valueOf(consumerRecord.offset()), consumerRecord.value());
                    }
                }
                kafkaConsumer.commitAsync();
                kafkaConsumer.unsubscribe();
                return arrayList;
            } catch (Exception e) {
                throw new MyException("获取kafka文件列表失败", (Throwable) e);
            }
        });
    }

    @Override // cn.benma666.sjzt.BasicSjzt
    public InputStream getInputStream(IFile iFile) {
        if (iFile instanceof KafkaFile) {
            return new ByteArrayInputStream(((String) ((KafkaFile) iFile).getFile().value()).getBytes());
        }
        throw new MyException("不支持非kafkafile");
    }

    @Override // cn.benma666.sjzt.BasicSjzt
    public boolean delete(IFile iFile) {
        if (iFile instanceof KafkaFile) {
            return true;
        }
        throw new MyException("不支持非kafkafile");
    }

    public boolean pub(String str, String str2) throws Exception {
        return save(new ByteArrayInputStream(str2.getBytes(StandardCharsets.UTF_8)), new BdwjFile(str, null));
    }

    @Override // cn.benma666.sjzt.BasicSjzt
    public boolean save(InputStream inputStream, IFile iFile) {
        return ((Boolean) exec(kafkaClient -> {
            ProducerRecord producerRecord = null;
            try {
                try {
                    AtomicInteger atomicInteger = new AtomicInteger(0);
                    producerRecord = new ProducerRecord(iFile.getParent().replace(UtilConstInstance.FXG, UtilConstInstance.NULL_STR), Utils.read(inputStream));
                    kafkaClient.getKafkaProducer().send(producerRecord, (recordMetadata, exc) -> {
                        if (exc == null) {
                            atomicInteger.set(2);
                        } else {
                            atomicInteger.set(1);
                            this.log.error("消息推送异常：" + producerRecord.topic() + ">" + ((String) producerRecord.value()), exc);
                        }
                    });
                    while (atomicInteger.get() == 0) {
                        Thread.sleep(100L);
                    }
                    Boolean valueOf = Boolean.valueOf(atomicInteger.get() == 2);
                    FileUtil.closeStream(inputStream);
                    return valueOf;
                } catch (Throwable th) {
                    if (producerRecord == null) {
                        throw new MyException("消息推送异常", th);
                    }
                    throw new MyException("消息推送异常：" + producerRecord.topic() + ">" + ((String) producerRecord.value()), th);
                }
            } catch (Throwable th2) {
                FileUtil.closeStream(inputStream);
                throw th2;
            }
        })).booleanValue();
    }

    @Override // cn.benma666.sjzt.BasicSjzt
    public String getRootPath() {
        return this.sjzt.getDxgsNew();
    }

    @Override // cn.benma666.sjzt.BasicSjzt
    public long getSize(IFile iFile) {
        if (iFile instanceof KafkaFile) {
            return ((String) ((KafkaFile) iFile).getFile().value()).getBytes().length;
        }
        throw new SjztBzcwjdxExecption("不支持非kafkafile");
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (!this.objectPool.isClosed()) {
            this.objectPool.close();
        }
        cache.remove(this.name);
        if (this == kafka) {
            kafka = null;
        }
    }

    @Override // cn.benma666.sjzt.BasicSjzt
    public void sjztjt(SysSjglZnjh sysSjglZnjh, InterfaceLog interfaceLog) {
        exec(kafkaClient -> {
            long scSjStrToLong = DateUtil.scSjStrToLong(sysSjglZnjh.getJtjg());
            String srml = sysSjglZnjh.getSrml();
            KafkaConsumer kafkaConsumer = kafkaClient.getKafkaConsumer();
            kafkaConsumer.subscribe(Collections.singleton(srml.replace(UtilConstInstance.FXG, UtilConstInstance.NULL_STR)));
            while (true) {
                Iterator it = kafkaConsumer.poll(Duration.ofMillis(scSjStrToLong)).iterator();
                while (it.hasNext()) {
                    KafkaFile kafkaFile = new KafkaFile(UtilConstInstance.FXG, (ConsumerRecord) it.next(), this);
                    kafkaFile.setGzml(srml);
                    sysSjglZnjh.getTp().run(() -> {
                        this.znjh.znjh(sysSjglZnjh, interfaceLog, kafkaFile);
                    });
                }
            }
        });
    }

    public Collection<String> getAllAcl() {
        return (Collection) exec(kafkaClient -> {
            return kafkaClient.getAdminClient().describeAcls(AclBindingFilter.ANY).values().get();
        });
    }

    public Set<String> getTopics() {
        return (Set) exec(kafkaClient -> {
            return kafkaClient.getAdminClient().listTopics().names().get();
        });
    }

    public Collection<String> getConsumerGroups() {
        return (Collection) exec(kafkaClient -> {
            return kafkaClient.getAdminClient().listConsumerGroups().valid().get();
        });
    }

    public void addTopicReadOrWriterAcl(String str, String str2) {
        exec(kafkaClient -> {
            AdminClient adminClient = kafkaClient.getAdminClient();
            ResourcePattern resourcePattern = new ResourcePattern(ResourceType.TOPIC, str, PatternType.LITERAL);
            AccessControlEntry accessControlEntry = new AccessControlEntry("User:" + str2, "*", AclOperation.WRITE, AclPermissionType.ALLOW);
            AccessControlEntry accessControlEntry2 = new AccessControlEntry("User:" + str2, "*", AclOperation.READ, AclPermissionType.ALLOW);
            AclBinding aclBinding = new AclBinding(resourcePattern, accessControlEntry);
            AclBinding aclBinding2 = new AclBinding(resourcePattern, accessControlEntry2);
            ArrayList arrayList = new ArrayList();
            arrayList.add(aclBinding);
            arrayList.add(aclBinding2);
            adminClient.createAcls(arrayList).all().get();
            return null;
        });
    }

    public void addGroupReadAcl(String str, String str2) {
        exec(kafkaClient -> {
            kafkaClient.getAdminClient().createAcls(Collections.singletonList(new AclBinding(new ResourcePattern(ResourceType.GROUP, str, PatternType.LITERAL), new AccessControlEntry("User:" + str2, "*", AclOperation.READ, AclPermissionType.ALLOW)))).all().get();
            return null;
        });
    }

    public void creatTopic(String str) {
        exec(kafkaClient -> {
            AdminClient adminClient = kafkaClient.getAdminClient();
            if (((Set) adminClient.listTopics().names().get()).contains(str)) {
                return null;
            }
            adminClient.createTopics(Collections.singletonList(new NewTopic(str, 1, (short) 1))).all().get();
            return null;
        });
    }

    public void deleteTopic(String str) {
        exec(kafkaClient -> {
            Iterator it = kafkaClient.getAdminClient().deleteTopics(Collections.singletonList(str)).values().entrySet().iterator();
            while (it.hasNext()) {
                ((KafkaFuture) ((Map.Entry) it.next()).getValue()).get();
            }
            return null;
        });
    }
}
