/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.symbio.store.state.dynamodb;

import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
import com.amazonaws.services.dynamodbv2.model.DeleteItemRequest;
import com.amazonaws.services.dynamodbv2.model.ScanRequest;
import io.vlingo.actors.Actor;
import io.vlingo.common.Cancellable;
import io.vlingo.common.Scheduled;
import io.vlingo.symbio.Entry;
import io.vlingo.symbio.State;
import io.vlingo.symbio.store.dispatch.ConfirmDispatchedResultInterest;
import io.vlingo.symbio.store.dispatch.Dispatchable;
import io.vlingo.symbio.store.dispatch.Dispatcher;
import io.vlingo.symbio.store.dispatch.DispatcherControl;
import io.vlingo.symbio.store.state.dynamodb.adapters.RecordAdapter;
import io.vlingo.symbio.store.state.dynamodb.handlers.ConfirmDispatchableAsyncHandler;
import io.vlingo.symbio.store.state.dynamodb.handlers.DispatchAsyncHandler;
import java.time.Duration;
import java.time.LocalDateTime;

public class DynamoDBDispatcherControlActor<RS extends State<?>>
extends Actor
implements DispatcherControl,
Scheduled<Object> {
    public static final long DEFAULT_REDISPATCH_DELAY = 2000L;
    private final Dispatcher<Dispatchable<Entry<?>, RS>> dispatcher;
    private final AmazonDynamoDBAsync dynamodb;
    private final RecordAdapter<RS> recordAdapter;
    private final long confirmationExpiration;
    private final Cancellable cancellable;

    public DynamoDBDispatcherControlActor(Dispatcher dispatcher, AmazonDynamoDBAsync dynamodb, RecordAdapter<RS> recordAdapter, long checkConfirmationExpirationInterval, long confirmationExpiration) {
        this.dispatcher = dispatcher;
        this.dynamodb = dynamodb;
        this.recordAdapter = recordAdapter;
        this.confirmationExpiration = confirmationExpiration;
        this.cancellable = this.scheduler().schedule((Scheduled)this.selfAs(Scheduled.class), null, 2000L, checkConfirmationExpirationInterval);
    }

    public void intervalSignal(Scheduled<Object> scheduled, Object data) {
        this.dispatchUnconfirmed();
    }

    public void confirmDispatched(String dispatchId, ConfirmDispatchedResultInterest interest) {
        this.dynamodb.deleteItemAsync(new DeleteItemRequest("vlingo_dispatchables", this.recordAdapter.marshallForQuery(dispatchId)), (AsyncHandler)new ConfirmDispatchableAsyncHandler(dispatchId, interest));
    }

    public void dispatchUnconfirmed() {
        this.dynamodb.scanAsync(new ScanRequest("vlingo_dispatchables").withLimit(Integer.valueOf(100)), new DispatchAsyncHandler(this.recordAdapter::unmarshallDispatchable, this::doDispatch));
    }

    private Void doDispatch(Dispatchable<Entry<?>, RS> dispatchable) {
        Duration duration = Duration.between(dispatchable.createdOn(), LocalDateTime.now());
        if (Math.abs(duration.toMillis()) > this.confirmationExpiration) {
            this.dispatcher.dispatch(dispatchable);
        }
        return null;
    }

    public void stop() {
        if (this.cancellable != null) {
            this.cancellable.cancel();
        }
        super.stop();
    }
}

