package org.apache.eventmesh.openconnect;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.eventmesh.client.tcp.EventMeshTCPClient;
import org.apache.eventmesh.client.tcp.EventMeshTCPClientFactory;
import org.apache.eventmesh.client.tcp.common.MessageUtils;
import org.apache.eventmesh.client.tcp.conf.EventMeshTCPClientConfig;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.SystemUtils;
import org.apache.eventmesh.openconnect.api.config.SourceConfig;
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/openconnect/SourceWorker.class */
public class SourceWorker implements ConnectorWorker {
    private static final Logger log = LoggerFactory.getLogger(SourceWorker.class);
    private final Source source;
    private final SourceConfig config;
    private final EventMeshTCPClient<CloudEvent> eventMeshTCPClient;
    private final ExecutorService pollService = Executors.newSingleThreadExecutor();
    private final ExecutorService startService = Executors.newSingleThreadExecutor();
    private volatile boolean isRunning = false;
    private final BlockingQueue<ConnectRecord> queue = new LinkedBlockingQueue(1000);

    public SourceWorker(Source source, SourceConfig sourceConfig) throws Exception {
        this.source = source;
        this.config = sourceConfig;
        this.eventMeshTCPClient = buildEventMeshPubClient(sourceConfig);
        this.eventMeshTCPClient.init();
    }

    private EventMeshTCPClient<CloudEvent> buildEventMeshPubClient(SourceConfig sourceConfig) {
        String meshAddress = sourceConfig.getPubSubConfig().getMeshAddress();
        String str = meshAddress.split(":")[0];
        int parseInt = Integer.parseInt(meshAddress.split(":")[1]);
        return EventMeshTCPClientFactory.createEventMeshTCPClient(EventMeshTCPClientConfig.builder().host(str).port(parseInt).userAgent(MessageUtils.generatePubClient(UserAgent.builder().env(sourceConfig.getPubSubConfig().getEnv()).host("localhost").password(sourceConfig.getPubSubConfig().getPassWord()).username(sourceConfig.getPubSubConfig().getUserName()).group(sourceConfig.getPubSubConfig().getGroup()).path("/").port(8362).subsystem(sourceConfig.getPubSubConfig().getAppId()).pid(Integer.parseInt(SystemUtils.getProcessId())).version("2.0").idc(sourceConfig.getPubSubConfig().getIdc()).build())).build(), CloudEvent.class);
    }

    @Override // org.apache.eventmesh.openconnect.ConnectorWorker
    public void start() {
        log.info("source worker starting {}", this.source.name());
        log.info("event mesh address is {}", this.config.getPubSubConfig().getMeshAddress());
        this.isRunning = true;
        this.pollService.execute(this::startPoll);
        this.startService.execute(() -> {
            try {
                startConnector();
            } catch (Exception e) {
                log.error("source worker[{}] start fail", this.source.name(), e);
                throw new RuntimeException(e);
            }
        });
    }

    public void startPoll() {
        while (this.isRunning) {
            ConnectRecord connectRecord = null;
            try {
                connectRecord = this.queue.poll(5L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("poll connect record error", e);
            }
            if (connectRecord != null) {
                this.eventMeshTCPClient.publish(convertRecordToEvent(connectRecord), 3000L);
            }
        }
    }

    private void startConnector() throws Exception {
        this.source.start();
        while (this.isRunning) {
            List<ConnectRecord> poll = this.source.poll();
            if (!CollectionUtils.isEmpty(poll)) {
                Iterator<ConnectRecord> it = poll.iterator();
                while (it.hasNext()) {
                    this.queue.put(it.next());
                }
            }
        }
    }

    private CloudEvent convertRecordToEvent(ConnectRecord connectRecord) {
        return CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSubject(this.config.getPubSubConfig().getSubject()).withSource(URI.create("/")).withDataContentType("application/cloudevents+json").withType("cloudevents").withData(((String) Objects.requireNonNull(JsonUtils.toJSONString(connectRecord.getData()))).getBytes(StandardCharsets.UTF_8)).withExtension("ttl", 10000).build();
    }

    @Override // org.apache.eventmesh.openconnect.ConnectorWorker
    public void stop() {
        log.info("source worker stopping");
        this.isRunning = false;
        try {
            this.source.stop();
        } catch (Exception e) {
            e.printStackTrace();
            log.error("source destroy error", e);
        }
        this.pollService.shutdown();
        try {
            this.pollService.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e2) {
            log.error("awaitTermination error", e2);
        }
        try {
            this.eventMeshTCPClient.close();
        } catch (Exception e3) {
            log.error("event mesh client close error", e3);
        }
        log.info("source worker stopped");
    }
}
