package io.vlingo.symbio.store.state.dynamodb;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.dynamodbv2.model.DeleteItemRequest;
import com.amazonaws.services.dynamodbv2.model.ScanRequest;
import io.vlingo.actors.Actor;
import io.vlingo.common.Cancellable;
import io.vlingo.common.Scheduled;
import io.vlingo.symbio.Entry;
import io.vlingo.symbio.State;
import io.vlingo.symbio.store.dispatch.ConfirmDispatchedResultInterest;
import io.vlingo.symbio.store.dispatch.Dispatchable;
import io.vlingo.symbio.store.dispatch.Dispatcher;
import io.vlingo.symbio.store.dispatch.DispatcherControl;
import io.vlingo.symbio.store.state.dynamodb.adapters.RecordAdapter;
import io.vlingo.symbio.store.state.dynamodb.handlers.ConfirmDispatchableAsyncHandler;
import io.vlingo.symbio.store.state.dynamodb.handlers.DispatchAsyncHandler;
import java.time.Duration;
import java.time.LocalDateTime;

/* loaded from: input_file:io/vlingo/symbio/store/state/dynamodb/DynamoDBDispatcherControlActor.class */
public class DynamoDBDispatcherControlActor<RS extends State<?>> extends Actor implements DispatcherControl, Scheduled<Object> {
    public static final long DEFAULT_REDISPATCH_DELAY = 2000;
    private final Dispatcher<Dispatchable<Entry<?>, RS>> dispatcher;
    private final AmazonDynamoDBAsync dynamodb;
    private final RecordAdapter<RS> recordAdapter;
    private final long confirmationExpiration;
    private final Cancellable cancellable;

    public DynamoDBDispatcherControlActor(Dispatcher dispatcher, AmazonDynamoDBAsync amazonDynamoDBAsync, RecordAdapter<RS> recordAdapter, long j, long j2) {
        this.dispatcher = dispatcher;
        this.dynamodb = amazonDynamoDBAsync;
        this.recordAdapter = recordAdapter;
        this.confirmationExpiration = j2;
        this.cancellable = scheduler().schedule((Scheduled) selfAs(Scheduled.class), (Object) null, DEFAULT_REDISPATCH_DELAY, j);
    }

    public void intervalSignal(Scheduled<Object> scheduled, Object obj) {
        dispatchUnconfirmed();
    }

    public void confirmDispatched(String str, ConfirmDispatchedResultInterest confirmDispatchedResultInterest) {
        this.dynamodb.deleteItemAsync(new DeleteItemRequest(DynamoDBStateActor.DISPATCHABLE_TABLE_NAME, this.recordAdapter.marshallForQuery(str)), new ConfirmDispatchableAsyncHandler(str, confirmDispatchedResultInterest));
    }

    public void dispatchUnconfirmed() {
        AmazonDynamoDBAsync amazonDynamoDBAsync = this.dynamodb;
        ScanRequest withLimit = new ScanRequest(DynamoDBStateActor.DISPATCHABLE_TABLE_NAME).withLimit(100);
        RecordAdapter<RS> recordAdapter = this.recordAdapter;
        recordAdapter.getClass();
        amazonDynamoDBAsync.scanAsync(withLimit, new DispatchAsyncHandler(recordAdapter::unmarshallDispatchable, this::doDispatch));
    }

    private Void doDispatch(Dispatchable<Entry<?>, RS> dispatchable) {
        if (Math.abs(Duration.between(dispatchable.createdOn(), LocalDateTime.now()).toMillis()) <= this.confirmationExpiration) {
            return null;
        }
        this.dispatcher.dispatch(dispatchable);
        return null;
    }

    public void stop() {
        if (this.cancellable != null) {
            this.cancellable.cancel();
        }
        super.stop();
    }
}
