package org.radarbase.mock;

import com.opencsv.exceptions.CsvValidationException;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import okhttp3.Credentials;
import okhttp3.FormBody;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.avro.SchemaValidationException;
import org.json.JSONObject;
import org.radarbase.config.ServerConfig;
import org.radarbase.config.YamlConfigLoader;
import org.radarbase.mock.config.AuthConfig;
import org.radarbase.mock.config.BasicMockConfig;
import org.radarbase.mock.config.MockDataConfig;
import org.radarbase.mock.data.MockCsvParser;
import org.radarbase.mock.data.RecordGenerator;
import org.radarbase.producer.BatchedKafkaSender;
import org.radarbase.producer.KafkaSender;
import org.radarbase.producer.direct.DirectSender;
import org.radarbase.producer.rest.ConnectionState;
import org.radarbase.producer.rest.RestClient;
import org.radarbase.producer.rest.RestSender;
import org.radarbase.producer.rest.SchemaRetriever;
import org.radarcns.kafka.ObservationKey;
import org.radarcns.passive.empatica.EmpaticaE4Acceleration;
import org.radarcns.passive.empatica.EmpaticaE4BatteryLevel;
import org.radarcns.passive.empatica.EmpaticaE4BloodVolumePulse;
import org.radarcns.passive.empatica.EmpaticaE4ElectroDermalActivity;
import org.radarcns.passive.empatica.EmpaticaE4InterBeatInterval;
import org.radarcns.passive.empatica.EmpaticaE4Temperature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/radarbase/mock/MockProducer.class */
public class MockProducer {
    private static final Logger logger = LoggerFactory.getLogger(MockProducer.class);
    private final List<MockDevice<ObservationKey>> devices;
    private final List<MockFileSender> files;
    private final List<KafkaSender> senders;
    private final SchemaRetriever retriever;

    public MockProducer(BasicMockConfig basicMockConfig) throws IOException {
        this(basicMockConfig, null);
    }

    public MockProducer(BasicMockConfig basicMockConfig, Path path) throws IOException {
        int intValue = basicMockConfig.getNumberOfDevices().intValue();
        this.retriever = new SchemaRetriever(basicMockConfig.getSchemaRegistry(), 10L);
        List<KafkaSender> list = null;
        try {
            this.devices = new ArrayList(intValue);
            this.files = new ArrayList(intValue);
            List<MockDataConfig> data = basicMockConfig.getData();
            data = data == null ? defaultDataConfig() : data;
            List<RecordGenerator<ObservationKey>> createGenerators = createGenerators(data);
            List<MockCsvParser<ObservationKey>> createMockFiles = createMockFiles(data, path);
            list = createSenders(basicMockConfig, intValue + createMockFiles.size(), basicMockConfig.getAuthConfig());
            if (!createGenerators.isEmpty()) {
                for (int i = 0; i < intValue; i++) {
                    this.devices.add(new MockDevice<>(list.get(i), new ObservationKey("test", "UserID_" + i, "SourceID_" + i), createGenerators));
                }
            }
            for (int i2 = 0; i2 < createMockFiles.size(); i2++) {
                this.files.add(new MockFileSender(list.get(i2 + intValue), createMockFiles.get(i2)));
            }
            this.senders = list;
        } catch (Exception e) {
            if (list != null) {
                Iterator<KafkaSender> it = list.iterator();
                while (it.hasNext()) {
                    it.next().close();
                }
            }
            throw e;
        } catch (CsvValidationException e2) {
            if (list != null) {
                Iterator<KafkaSender> it2 = list.iterator();
                while (it2.hasNext()) {
                    it2.next().close();
                }
            }
            throw new IOException("Cannot read CSV file", e2);
        }
    }

    private List<KafkaSender> createSenders(BasicMockConfig basicMockConfig, int i, AuthConfig authConfig) throws IOException {
        return basicMockConfig.isDirectProducer() ? createDirectSenders(i, basicMockConfig.getSchemaRegistry().getUrlString(), basicMockConfig.getBrokerPaths()) : createRestSenders(i, this.retriever, basicMockConfig.getRestProxy(), basicMockConfig.hasCompression(), authConfig);
    }

    private List<KafkaSender> createDirectSenders(int i, String str, String str2) {
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            Properties properties = new Properties();
            properties.put("key.serializer", KafkaAvroSerializer.class);
            properties.put("value.serializer", KafkaAvroSerializer.class);
            properties.put("schema.registry.url", str);
            properties.put("bootstrap.servers", str2);
            arrayList.add(new DirectSender(properties));
        }
        return arrayList;
    }

    private String requestAccessToken(OkHttpClient okHttpClient, AuthConfig authConfig) throws IOException {
        Request build = new Request.Builder().url(authConfig.getTokenUrl()).post(new FormBody.Builder().add("grant_type", "client_credentials").add("client_id", authConfig.getClientId()).add("client_secret", authConfig.getClientSecret()).build()).addHeader("Authorization", Credentials.basic(authConfig.getClientId(), authConfig.getClientSecret())).build();
        Response execute = okHttpClient.newCall(build).execute();
        try {
            ResponseBody body = execute.body();
            if (body == null) {
                throw new IOException("Cannot request token at " + build.url() + " (" + execute.code() + ") returned no body");
            }
            if (!execute.isSuccessful()) {
                throw new IOException("Cannot request token: at " + build.url() + " (" + execute.code() + "): " + body.string());
            }
            String string = new JSONObject(body.string()).getString("access_token");
            if (execute != null) {
                execute.close();
            }
            return string;
        } catch (Throwable th) {
            if (execute != null) {
                try {
                    execute.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private List<KafkaSender> createRestSenders(int i, SchemaRetriever schemaRetriever, ServerConfig serverConfig, boolean z, AuthConfig authConfig) throws IOException {
        ArrayList arrayList = new ArrayList(i);
        ConnectionState connectionState = new ConnectionState(10L, TimeUnit.SECONDS);
        Headers of = authConfig == null ? Headers.of(new String[0]) : Headers.of(new String[]{"Authorization", "Bearer " + requestAccessToken(new OkHttpClient(), authConfig)});
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add(new BatchedKafkaSender(new RestSender.Builder().schemaRetriever(schemaRetriever).httpClient(RestClient.newClient().server(serverConfig).gzipCompression(z).timeout(10L, TimeUnit.SECONDS).build()).connectionState(connectionState).headers(of).build(), 1000, 1000));
        }
        return arrayList;
    }

    public void start() throws IOException {
        Iterator<MockDevice<ObservationKey>> it = this.devices.iterator();
        while (it.hasNext()) {
            it.next().start();
        }
        for (MockFileSender mockFileSender : this.files) {
            mockFileSender.send();
            logger.info("Sent data {}", mockFileSender);
        }
    }

    public void shutdown() throws IOException, InterruptedException, SchemaValidationException {
        if (!this.devices.isEmpty()) {
            logger.info("Shutting down mock devices");
            Iterator<MockDevice<ObservationKey>> it = this.devices.iterator();
            while (it.hasNext()) {
                it.next().shutdown();
            }
            logger.info("Waiting for mock devices to finish...");
            Iterator<MockDevice<ObservationKey>> it2 = this.devices.iterator();
            while (it2.hasNext()) {
                it2.next().join(5000L);
            }
        }
        logger.info("Closing channels");
        Iterator<KafkaSender> it3 = this.senders.iterator();
        while (it3.hasNext()) {
            it3.next().close();
        }
        Iterator<MockDevice<ObservationKey>> it4 = this.devices.iterator();
        while (it4.hasNext()) {
            it4.next().checkException();
        }
    }

    public static void main(String[] strArr) {
        if (strArr.length != 1) {
            logger.error("This command needs a mock file argument");
            System.exit(1);
        }
        Path absolutePath = Paths.get(strArr[0], new String[0]).toAbsolutePath();
        BasicMockConfig basicMockConfig = null;
        try {
            basicMockConfig = (BasicMockConfig) new YamlConfigLoader().load(absolutePath, BasicMockConfig.class);
        } catch (IOException e) {
            logger.error("Failed to load given mock file {}: {}", absolutePath, e.getMessage());
            System.exit(1);
        }
        try {
            MockProducer mockProducer = new MockProducer(basicMockConfig, absolutePath.getParent());
            mockProducer.start();
            if (!mockProducer.devices.isEmpty()) {
                waitForProducer(mockProducer, basicMockConfig.getDuration());
            }
        } catch (IllegalArgumentException e2) {
            logger.error("{}", e2.getMessage());
            System.exit(1);
        } catch (InterruptedException e3) {
        } catch (Exception e4) {
            logger.error("Failed to start mock producer", e4);
            System.exit(1);
        }
    }

    private static void waitForProducer(MockProducer mockProducer, long j) throws IOException, InterruptedException, SchemaValidationException {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                if (!atomicBoolean.get()) {
                    mockProducer.shutdown();
                }
            } catch (InterruptedException e) {
                logger.warn("Shutdown interrupted", e);
            } catch (Exception e2) {
                logger.warn("Failed to shutdown producer", e2);
            }
        }));
        if (j <= 0) {
            try {
                logger.info("Producing data until interrupted");
                Thread.sleep(Long.MAX_VALUE);
            } catch (InterruptedException e) {
            }
        } else {
            try {
                logger.info("Producing data for {} seconds", Double.valueOf(j / 1000.0d));
                Thread.sleep(j);
            } catch (InterruptedException e2) {
                logger.warn("Data producing interrupted");
            }
            mockProducer.shutdown();
            atomicBoolean.set(true);
            logger.info("Producing data done.");
        }
    }

    private List<MockDataConfig> defaultDataConfig() {
        MockDataConfig mockDataConfig = new MockDataConfig();
        mockDataConfig.setTopic("android_empatica_e4_acceleration");
        mockDataConfig.setFrequency(32);
        mockDataConfig.setValueSchema(EmpaticaE4Acceleration.class.getName());
        mockDataConfig.setInterval(-2.0d, 2.0d);
        mockDataConfig.setValueFields(Arrays.asList("x", "y", "z"));
        MockDataConfig mockDataConfig2 = new MockDataConfig();
        mockDataConfig2.setTopic("android_empatica_e4_battery_level");
        mockDataConfig2.setValueSchema(EmpaticaE4BatteryLevel.class.getName());
        mockDataConfig2.setFrequency(1);
        mockDataConfig2.setInterval(0.0d, 1.0d);
        mockDataConfig2.setValueField("batteryLevel");
        MockDataConfig mockDataConfig3 = new MockDataConfig();
        mockDataConfig3.setTopic("android_empatica_e4_blood_volume_pulse");
        mockDataConfig3.setValueSchema(EmpaticaE4BloodVolumePulse.class.getName());
        mockDataConfig3.setFrequency(64);
        mockDataConfig3.setInterval(60.0d, 90.0d);
        mockDataConfig3.setValueField("bloodVolumePulse");
        MockDataConfig mockDataConfig4 = new MockDataConfig();
        mockDataConfig4.setTopic("android_empatica_e4_electrodermal_activity");
        mockDataConfig4.setValueSchema(EmpaticaE4ElectroDermalActivity.class.getName());
        mockDataConfig4.setValueField("electroDermalActivity");
        mockDataConfig4.setFrequency(4);
        mockDataConfig4.setInterval(0.01d, 0.05d);
        MockDataConfig mockDataConfig5 = new MockDataConfig();
        mockDataConfig5.setTopic("android_empatica_e4_inter_beat_interval");
        mockDataConfig5.setValueSchema(EmpaticaE4InterBeatInterval.class.getName());
        mockDataConfig5.setValueField("interBeatInterval");
        mockDataConfig5.setFrequency(1);
        mockDataConfig5.setInterval(40.0d, 150.0d);
        MockDataConfig mockDataConfig6 = new MockDataConfig();
        mockDataConfig6.setTopic("android_empatica_e4_temperature");
        mockDataConfig6.setValueSchema(EmpaticaE4Temperature.class.getName());
        mockDataConfig6.setFrequency(4);
        mockDataConfig6.setInterval(20.0d, 60.0d);
        mockDataConfig6.setValueField("temperature");
        return Arrays.asList(mockDataConfig, mockDataConfig2, mockDataConfig3, mockDataConfig4, mockDataConfig5, mockDataConfig6);
    }

    private List<RecordGenerator<ObservationKey>> createGenerators(List<MockDataConfig> list) {
        ArrayList arrayList = new ArrayList(list.size());
        for (MockDataConfig mockDataConfig : list) {
            if (mockDataConfig.getDataFile() == null) {
                arrayList.add(new RecordGenerator(mockDataConfig, ObservationKey.class));
            }
        }
        return arrayList;
    }

    private List<MockCsvParser<ObservationKey>> createMockFiles(List<MockDataConfig> list, Path path) throws IOException, CsvValidationException {
        ArrayList arrayList = new ArrayList(list.size());
        Path path2 = path;
        if (path2 == null) {
            path2 = Paths.get(".", new String[0]).toAbsolutePath();
        }
        for (MockDataConfig mockDataConfig : list) {
            if (mockDataConfig.getDataFile() != null) {
                logger.info("Reading mock data from {}", mockDataConfig.getDataFile());
                arrayList.add(new MockCsvParser(mockDataConfig, path2));
            } else {
                logger.info("Generating mock data from {}", mockDataConfig);
            }
        }
        return arrayList;
    }
}
