package io.vlingo.xoom.symbio.store.journal.jdbc;

import io.vlingo.xoom.actors.Actor;
import io.vlingo.xoom.actors.ActorInstantiator;
import io.vlingo.xoom.actors.Definition;
import io.vlingo.xoom.common.Completes;
import io.vlingo.xoom.common.Failure;
import io.vlingo.xoom.common.Scheduled;
import io.vlingo.xoom.symbio.BaseEntry;
import io.vlingo.xoom.symbio.Entry;
import io.vlingo.xoom.symbio.EntryAdapterProvider;
import io.vlingo.xoom.symbio.Metadata;
import io.vlingo.xoom.symbio.Source;
import io.vlingo.xoom.symbio.State;
import io.vlingo.xoom.symbio.StateAdapterProvider;
import io.vlingo.xoom.symbio.store.Result;
import io.vlingo.xoom.symbio.store.StorageException;
import io.vlingo.xoom.symbio.store.common.jdbc.Configuration;
import io.vlingo.xoom.symbio.store.common.jdbc.DatabaseType;
import io.vlingo.xoom.symbio.store.journal.Journal;
import io.vlingo.xoom.symbio.store.journal.JournalReader;
import io.vlingo.xoom.symbio.store.journal.StreamReader;
import io.vlingo.xoom.symbio.store.journal.jdbc.JDBCJournalReaderActor;
import io.vlingo.xoom.symbio.store.journal.jdbc.JDBCStreamReaderActor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

/* loaded from: input_file:io/vlingo/xoom/symbio/store/journal/jdbc/JDBCJournalActor.class */
public class JDBCJournalActor extends Actor implements Journal<String>, Scheduled<Object> {
    private final JDBCJournalWriter journalWriter;
    private final EntryAdapterProvider entryAdapterProvider;
    private final StateAdapterProvider stateAdapterProvider;
    private final Configuration configuration;
    private final DatabaseType databaseType;
    private final Map<String, JournalReader<BaseEntry.TextEntry>> journalReaders;
    private final Map<String, StreamReader<String>> streamReaders;

    /* loaded from: input_file:io/vlingo/xoom/symbio/store/journal/jdbc/JDBCJournalActor$JDBCJournalActorInstantiator.class */
    public static class JDBCJournalActorInstantiator implements ActorInstantiator<JDBCJournalActor> {
        private static final long serialVersionUID = 2184177499416088762L;
        private final Configuration configuration;
        private final JDBCJournalWriter journalWriter;
        private final Optional<Integer> timeBetweenFlushWrites;

        public JDBCJournalActorInstantiator(Configuration configuration, JDBCJournalInstantWriter jDBCJournalInstantWriter) {
            this.configuration = configuration;
            this.journalWriter = jDBCJournalInstantWriter;
            this.timeBetweenFlushWrites = Optional.empty();
        }

        public JDBCJournalActorInstantiator(Configuration configuration, JDBCJournalBatchWriter jDBCJournalBatchWriter, int i) {
            this.configuration = configuration;
            this.journalWriter = jDBCJournalBatchWriter;
            this.timeBetweenFlushWrites = Optional.of(Integer.valueOf(i));
        }

        /* renamed from: instantiate, reason: merged with bridge method [inline-methods] */
        public JDBCJournalActor m7instantiate() {
            JDBCJournalActor jDBCJournalActor;
            try {
                if (this.timeBetweenFlushWrites.isPresent()) {
                    jDBCJournalActor = new JDBCJournalActor(this.configuration, (JDBCJournalBatchWriter) this.journalWriter, this.timeBetweenFlushWrites.get().intValue());
                } else {
                    jDBCJournalActor = new JDBCJournalActor(this.configuration, (JDBCJournalInstantWriter) this.journalWriter);
                }
                this.journalWriter.setLogger(jDBCJournalActor.logger());
                return jDBCJournalActor;
            } catch (Exception e) {
                throw new IllegalStateException("Could not instantiate JDBCJournalActor because: " + e.getMessage(), e);
            }
        }
    }

    private JDBCJournalActor(Configuration configuration, JDBCJournalWriter jDBCJournalWriter, Object obj) throws Exception {
        this.journalWriter = jDBCJournalWriter;
        this.configuration = configuration;
        this.databaseType = configuration.databaseType;
        this.entryAdapterProvider = EntryAdapterProvider.instance(stage().world());
        this.stateAdapterProvider = StateAdapterProvider.instance(stage().world());
        this.journalReaders = new HashMap();
        this.streamReaders = new HashMap();
        configuration.connection.setAutoCommit(false);
        JDBCQueries.queriesFor(configuration.connection).createTables();
    }

    public JDBCJournalActor(Configuration configuration, JDBCJournalInstantWriter jDBCJournalInstantWriter) throws Exception {
        this(configuration, jDBCJournalInstantWriter, (Object) null);
    }

    public JDBCJournalActor(Configuration configuration, JDBCJournalBatchWriter jDBCJournalBatchWriter, int i) throws Exception {
        this(configuration, jDBCJournalBatchWriter, (Object) null);
        stage().scheduler().schedule((Scheduled) selfAs(Scheduled.class), (Object) null, 5L, i);
    }

    public void stop() {
        this.journalWriter.stop();
        super.stop();
    }

    public <S, ST> void append(String str, int i, Source<S> source, Metadata metadata, Journal.AppendResultInterest appendResultInterest, Object obj) {
        this.journalWriter.appendEntry(str, i, asEntry(source, i, metadata, exc -> {
            appendResultedInFailure(str, i, source, null, appendResultInterest, obj, exc);
        }), Optional.empty(), outcome -> {
            appendResultInterest.appendResultedIn(outcome, str, i, source, Optional.empty(), obj);
        });
    }

    public <S, ST> void appendWith(String str, int i, Source<S> source, Metadata metadata, ST st, Journal.AppendResultInterest appendResultInterest, Object obj) {
        this.journalWriter.appendEntry(str, i, asEntry(source, i, metadata, exc -> {
            appendResultedInFailure(str, i, source, st, appendResultInterest, obj, exc);
        }), toState(str, st, i), outcome -> {
            appendResultInterest.appendResultedIn(outcome, str, i, source, Optional.ofNullable(st), obj);
        });
    }

    public <S, ST> void appendAll(String str, int i, List<Source<S>> list, Metadata metadata, Journal.AppendResultInterest appendResultInterest, Object obj) {
        this.journalWriter.appendEntries(str, i, asEntries(list, i, metadata, exc -> {
            appendAllResultedInFailure(str, i, list, null, appendResultInterest, obj, exc);
        }), Optional.empty(), outcome -> {
            appendResultInterest.appendAllResultedIn(outcome, str, i, list, Optional.empty(), obj);
        });
    }

    public <S, ST> void appendAllWith(String str, int i, List<Source<S>> list, Metadata metadata, ST st, Journal.AppendResultInterest appendResultInterest, Object obj) {
        this.journalWriter.appendEntries(str, i, asEntries(list, i, metadata, exc -> {
            appendAllResultedInFailure(str, i, list, st, appendResultInterest, obj, exc);
        }), toState(str, st, i), outcome -> {
            appendResultInterest.appendAllResultedIn(outcome, str, i, list, Optional.ofNullable(st), obj);
        });
    }

    public void intervalSignal(Scheduled<Object> scheduled, Object obj) {
        this.journalWriter.flush();
    }

    public Completes<JournalReader<? extends Entry<?>>> journalReader(String str) {
        return completes().with(this.journalReaders.computeIfAbsent(str, str2 -> {
            return (JournalReader) stage().actorFor(JournalReader.class, Definition.has(JDBCJournalReaderActor.class, new JDBCJournalReaderActor.JDBCJournalReaderInstantiator(this.configuration, str)), stage().world().addressFactory().uniquePrefixedWith("eventJournalReader-" + str));
        }));
    }

    public Completes<StreamReader<String>> streamReader(String str) {
        return completes().with(this.streamReaders.computeIfAbsent(str, str2 -> {
            return (StreamReader) stage().actorFor(StreamReader.class, Definition.has(JDBCStreamReaderActor.class, new JDBCStreamReaderActor.JDBCStreamReaderInstantiator(this.configuration)), stage().world().addressFactory().uniquePrefixedWith("eventStreamReader-" + str2));
        }));
    }

    private <S, ST> void appendResultedInFailure(String str, int i, Source<S> source, ST st, Journal.AppendResultInterest appendResultInterest, Object obj, Exception exc) {
        appendResultInterest.appendResultedIn(Failure.of(new StorageException(Result.Failure, exc.getMessage(), exc)), str, i, source, Optional.ofNullable(st), obj);
    }

    private <S, ST> void appendAllResultedInFailure(String str, int i, List<Source<S>> list, ST st, Journal.AppendResultInterest appendResultInterest, Object obj, Exception exc) {
        appendResultInterest.appendAllResultedIn(Failure.of(new StorageException(Result.Failure, exc.getMessage(), exc)), str, i, list, st == null ? Optional.empty() : Optional.of(st), obj);
    }

    private <S> List<Entry<String>> asEntries(List<Source<S>> list, int i, Metadata metadata, Consumer<Exception> consumer) {
        ArrayList arrayList = new ArrayList(list.size());
        int i2 = i;
        Iterator<Source<S>> it = list.iterator();
        while (it.hasNext()) {
            int i3 = i2;
            i2++;
            arrayList.add(asEntry(it.next(), i3, metadata, consumer));
        }
        return arrayList;
    }

    private <S> Entry<String> asEntry(Source<S> source, int i, Metadata metadata, Consumer<Exception> consumer) {
        try {
            return this.entryAdapterProvider.asEntry(source, i, metadata);
        } catch (Exception e) {
            consumer.accept(e);
            logger().error("xoom-symbio-jdbc:journal-" + this.databaseType + ": Cannot adapt source to entry because: ", e);
            throw new IllegalArgumentException(e);
        }
    }

    private <ST> Optional<State.TextState> toState(String str, ST st, int i) {
        return st == null ? Optional.empty() : Optional.of(this.stateAdapterProvider.asRaw(str, st, i));
    }
}
