package io.leoplatform.sdk.oracle;

import io.leoplatform.sdk.ExecutorManager;
import io.leoplatform.sdk.LoadingStream;
import io.leoplatform.sdk.StreamStats;
import io.leoplatform.sdk.payload.EventPayload;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import oracle.jdbc.dcn.DatabaseChangeEvent;
import oracle.jdbc.dcn.DatabaseChangeListener;
import oracle.jdbc.dcn.RowChangeDescription;
import oracle.jdbc.dcn.TableChangeDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:io/leoplatform/sdk/oracle/OracleChangeWriter.class */
public final class OracleChangeWriter implements DatabaseChangeListener {
    private static final Logger log = LoggerFactory.getLogger(OracleChangeWriter.class);
    private final LoadingStream stream;
    private final ExecutorManager executorManager;
    private final BlockingQueue<DatabaseChangeEvent> payloads = new LinkedBlockingQueue();
    private final Queue<CompletableFuture<Void>> pendingWrites = new LinkedList();
    private final Lock lock = new ReentrantLock();
    private final Condition changedRows = this.lock.newCondition();
    private final AtomicBoolean running = new AtomicBoolean(true);

    @Inject
    public OracleChangeWriter(LoadingStream loadingStream, ExecutorManager executorManager) {
        this.stream = loadingStream;
        this.executorManager = executorManager;
        CompletableFuture.runAsync(this::asyncWriter, executorManager.get());
    }

    public void onDatabaseChangeNotification(DatabaseChangeEvent databaseChangeEvent) {
        log.info("Received database notification {}", databaseChangeEvent);
        if (this.running.get()) {
            this.lock.lock();
            try {
                this.payloads.put(databaseChangeEvent);
                this.changedRows.signalAll();
            } catch (InterruptedException e) {
                log.warn("Batch writer stopped unexpectedly");
                this.running.set(false);
            } finally {
                this.lock.unlock();
            }
        }
    }

    private void asyncWriter() {
        while (this.running.get()) {
            this.lock.lock();
            try {
                try {
                    this.changedRows.await(200L, TimeUnit.MILLISECONDS);
                    LinkedList linkedList = new LinkedList();
                    this.payloads.drainTo(linkedList);
                    if (!linkedList.isEmpty()) {
                        Executor executor = this.executorManager.get();
                        this.pendingWrites.add(CompletableFuture.runAsync(() -> {
                            sendToBus(linkedList);
                        }, executor).thenRunAsync(this::removeCompleted, executor));
                    }
                    this.lock.unlock();
                } catch (InterruptedException e) {
                    log.warn("Oracle batch change writer stopped unexpectedly");
                    this.running.set(false);
                    this.lock.unlock();
                }
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        }
    }

    private void sendToBus(Queue<DatabaseChangeEvent> queue) {
        Map<String, Set<String>> tableChanges = toTableChanges(queue);
        tableChanges.forEach((str, set) -> {
            log.info("Sending rows for {}", str);
            log.info(String.join(",", set));
        });
        Optional map = Optional.of(tableChanges).filter(map2 -> {
            return !map2.isEmpty();
        }).map((v0) -> {
            return v0.entrySet();
        }).map((v0) -> {
            return v0.stream();
        }).map(this::reduceEntries).map((v0) -> {
            return v0.build();
        }).map(this::toPayload);
        LoadingStream loadingStream = this.stream;
        loadingStream.getClass();
        map.ifPresent(loadingStream::load);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<StreamStats> end() {
        this.running.set(false);
        this.lock.lock();
        try {
            this.changedRows.signalAll();
            completePendingTasks();
            return this.stream.end();
        } finally {
            this.lock.unlock();
        }
    }

    private Map<String, Set<String>> toTableChanges(Queue<DatabaseChangeEvent> queue) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        queue.forEach(databaseChangeEvent -> {
            rowsChanged(databaseChangeEvent).forEach((str, set) -> {
            });
        });
        return linkedHashMap;
    }

    private JsonObjectBuilder reduceEntries(Stream<Map.Entry<String, Set<String>>> stream) {
        return (JsonObjectBuilder) stream.reduce(Json.createObjectBuilder(), (jsonObjectBuilder, entry) -> {
            return jsonObjectBuilder.add((String) entry.getKey(), Json.createArrayBuilder((Collection) entry.getValue()));
        }, (v0, v1) -> {
            return v0.addAll(v1);
        });
    }

    private Map<String, Set<String>> rowsChanged(DatabaseChangeEvent databaseChangeEvent) {
        return (Map) tableChanges(databaseChangeEvent).parallelStream().map(this::rowChanges).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }, (set, set2) -> {
            return (Set) Stream.of((Object[]) new Set[]{set, set2}).flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toSet());
        }));
    }

    private List<TableChangeDescription> tableChanges(DatabaseChangeEvent databaseChangeEvent) {
        return (List) Optional.ofNullable(databaseChangeEvent).map((v0) -> {
            return v0.getTableChangeDescription();
        }).map((v0) -> {
            return Arrays.asList(v0);
        }).orElse(Collections.emptyList());
    }

    private Map.Entry<String, Set<String>> rowChanges(TableChangeDescription tableChangeDescription) {
        return new AbstractMap.SimpleImmutableEntry(tableName(tableChangeDescription), (Set) rowChangeDescription(tableChangeDescription).stream().map((v0) -> {
            return v0.getRowid();
        }).map((v0) -> {
            return v0.stringValue();
        }).collect(Collectors.toSet()));
    }

    private EventPayload toPayload(JsonObject jsonObject) {
        return () -> {
            return jsonObject;
        };
    }

    private String tableName(TableChangeDescription tableChangeDescription) {
        return (String) Optional.ofNullable(tableChangeDescription).map((v0) -> {
            return v0.getTableName();
        }).orElseThrow(() -> {
            return new IllegalArgumentException("Missing table name in description " + tableChangeDescription);
        });
    }

    private void removeCompleted() {
        this.lock.lock();
        try {
            this.pendingWrites.removeIf((v0) -> {
                return v0.isDone();
            });
        } finally {
            this.lock.unlock();
        }
    }

    private void completePendingTasks() {
        removeCompleted();
        this.lock.lock();
        try {
            long count = this.pendingWrites.parallelStream().filter(completableFuture -> {
                return !completableFuture.isDone();
            }).count();
            log.info("Waiting for {} Oracle writer task{} to complete", Long.valueOf(count), count == 1 ? "" : "s");
            while (!this.pendingWrites.isEmpty()) {
                this.lock.lock();
                try {
                    try {
                        this.changedRows.await(100L, TimeUnit.MILLISECONDS);
                        this.lock.unlock();
                    } catch (InterruptedException e) {
                        log.warn("Stopped with incomplete pending Oracle writer tasks");
                        this.pendingWrites.clear();
                        this.lock.unlock();
                    }
                    removeCompleted();
                } catch (Throwable th) {
                    throw th;
                }
            }
        } finally {
            this.lock.unlock();
        }
    }

    private List<RowChangeDescription> rowChangeDescription(TableChangeDescription tableChangeDescription) {
        return (List) Optional.ofNullable(tableChangeDescription).map((v0) -> {
            return v0.getRowChangeDescription();
        }).map((v0) -> {
            return Arrays.asList(v0);
        }).orElse(Collections.emptyList());
    }
}
