/*
 * Decompiled with CFR 0.152.
 */
package org.esbtools.eventhandler.lightblue;

import com.redhat.lightblue.client.LightblueClient;
import com.redhat.lightblue.client.LightblueException;
import com.redhat.lightblue.client.request.CRUDRequest;
import com.redhat.lightblue.client.request.DataBulkRequest;
import com.redhat.lightblue.client.request.LightblueDataRequest;
import com.redhat.lightblue.client.response.LightblueBulkDataResponse;
import com.redhat.lightblue.client.response.LightblueBulkResponseException;
import com.redhat.lightblue.client.response.LightblueDataResponse;
import com.redhat.lightblue.client.response.LightblueResponse;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.esbtools.eventhandler.DocumentEvent;
import org.esbtools.eventhandler.DocumentEventRepository;
import org.esbtools.eventhandler.FailedDocumentEvent;
import org.esbtools.eventhandler.lightblue.DocumentEventEntity;
import org.esbtools.eventhandler.lightblue.DocumentEventFactory;
import org.esbtools.eventhandler.lightblue.Identity;
import org.esbtools.eventhandler.lightblue.LightblueDocumentEvent;
import org.esbtools.eventhandler.lightblue.LightblueDocumentEventRepositoryConfig;
import org.esbtools.eventhandler.lightblue.ProcessingExpiredException;
import org.esbtools.eventhandler.lightblue.UnparseableDocumentEvent;
import org.esbtools.eventhandler.lightblue.client.BulkLightblueRequester;
import org.esbtools.eventhandler.lightblue.client.FindRequests;
import org.esbtools.eventhandler.lightblue.client.InsertRequests;
import org.esbtools.eventhandler.lightblue.client.LightblueErrors;
import org.esbtools.eventhandler.lightblue.client.LightblueRequester;
import org.esbtools.eventhandler.lightblue.client.UpdateRequests;
import org.esbtools.eventhandler.lightblue.locking.LockNotAvailableException;
import org.esbtools.eventhandler.lightblue.locking.LockStrategy;
import org.esbtools.eventhandler.lightblue.locking.Lockable;
import org.esbtools.eventhandler.lightblue.locking.LockedResource;
import org.esbtools.eventhandler.lightblue.locking.LockedResources;
import org.esbtools.eventhandler.lightblue.locking.LostLockException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LightblueDocumentEventRepository
implements DocumentEventRepository {
    private final LightblueClient lightblue;
    private final LightblueDocumentEventRepositoryConfig config;
    private final LockStrategy lockStrategy;
    private final Map<String, ? extends DocumentEventFactory> documentEventFactoriesByType;
    private final Clock clock;
    private final Set<String> supportedTypes;
    private final String[] supportedTypesArray;
    private static final Logger logger = LoggerFactory.getLogger(LightblueDocumentEventRepository.class);

    public LightblueDocumentEventRepository(LightblueClient lightblue, LockStrategy lockStrategy, LightblueDocumentEventRepositoryConfig config, Map<String, ? extends DocumentEventFactory> documentEventFactoriesByType, Clock clock) {
        this.lightblue = lightblue;
        this.lockStrategy = lockStrategy;
        this.config = config;
        this.documentEventFactoriesByType = documentEventFactoriesByType;
        this.clock = clock;
        this.supportedTypes = documentEventFactoriesByType.keySet();
        this.supportedTypesArray = this.supportedTypes.toArray(new String[this.supportedTypes.size()]);
    }

    public void addNewDocumentEvents(Collection<? extends DocumentEvent> documentEvents) throws LightblueException {
        if (documentEvents.isEmpty()) {
            return;
        }
        List documentEventEntities = documentEvents.stream().map(LightblueDocumentEventRepository::asEntity).collect(Collectors.toList());
        int newEventsCount = documentEventEntities.size();
        int maxEventsPerInsert = this.config.getOptionalMaxDocumentEventsPerInsert().orElse(newEventsCount);
        int insertCount = (int)Math.ceil((double)newEventsCount / (double)maxEventsPerInsert);
        for (int i = 0; i < insertCount; ++i) {
            int fromIndex = i * maxEventsPerInsert;
            int toIndex = Math.min(fromIndex + maxEventsPerInsert, newEventsCount);
            List<DocumentEventEntity> entitiesInBatch = documentEventEntities.subList(fromIndex, toIndex);
            logger.debug("Inserting batch #{} of new document events from {} to {}.", new Object[]{i, fromIndex, toIndex});
            this.lightblue.data((LightblueDataRequest)InsertRequests.documentEventsReturningOnlyIds(entitiesInBatch));
        }
    }

    public List<LightblueDocumentEvent> retrievePriorityDocumentEventsUpTo(int maxEvents) throws Exception {
        Object[] typesToProcess = this.getSupportedAndEnabledEventTypes();
        Integer documentEventsBatchSize = this.config.getDocumentEventsBatchSize();
        Duration processingTimeout = this.config.getDocumentEventProcessingTimeout();
        if (typesToProcess.length == 0 || documentEventsBatchSize == null || documentEventsBatchSize == 0) {
            logger.info("Not retrieving any document events because either there are no enabled or supported types to process or documentEventBatchSize is 0. Supported types are {}. Of those, enabled types are {}. Document event batch size is {}.", new Object[]{this.supportedTypes, Arrays.toString(typesToProcess), documentEventsBatchSize});
            return Collections.emptyList();
        }
        if (maxEvents == 0) {
            return Collections.emptyList();
        }
        DocumentEventEntity[] documentEventEntities = (DocumentEventEntity[])this.lightblue.data((LightblueDataRequest)FindRequests.priorityDocumentEventsForTypesUpTo((String[])typesToProcess, documentEventsBatchSize, this.clock.instant().minus(processingTimeout))).parseProcessed(DocumentEventEntity[].class);
        if (documentEventEntities.length == 0) {
            return Collections.emptyList();
        }
        try (LockedResources<SharedIdentityEvents> eventLocks = SharedIdentityEvents.parseAndOptimizeLockableDocumentEventEntities(maxEvents, documentEventEntities, new BulkLightblueRequester(this.lightblue), this.documentEventFactoriesByType, this.lockStrategy, this.clock);){
            List<LightblueDocumentEvent> list = this.persistNewEventsAndStatusUpdatesToExisting(eventLocks);
            return list;
        }
    }

    public void ensureTransactionActive(DocumentEvent event) throws Exception {
        if (!(event instanceof LightblueDocumentEvent)) {
            throw new IllegalArgumentException("Unknown event type. Only LightblueDocumentEvent is supported. Event type was: " + event.getClass());
        }
        LightblueDocumentEvent lightblueEvent = (LightblueDocumentEvent)event;
        Duration processingTimeout = this.config.getDocumentEventProcessingTimeout();
        Duration expireThreshold = this.config.getDocumentEventExpireThreshold();
        Instant processingDate = lightblueEvent.wrappedDocumentEventEntity().getProcessingDate().toInstant();
        Instant expireDate = processingDate.plus(processingTimeout).minus(expireThreshold);
        if (this.clock.instant().isAfter(expireDate)) {
            throw new ProcessingExpiredException(event, processingTimeout, expireThreshold);
        }
    }

    public void markDocumentEventsPublishedOrFailed(Collection<? extends DocumentEvent> documentEvents, Collection<FailedDocumentEvent> failures) throws LightblueException {
        List<DocumentEventEntity> processed = documentEvents.stream().map(LightblueDocumentEventRepository::asEntity).peek(e -> {
            e.setProcessedDate(ZonedDateTime.now(this.clock));
            e.setStatus(DocumentEventEntity.Status.published);
        }).collect(Collectors.toList());
        List<DocumentEventEntity> failed = failures.stream().map(FailedDocumentEvent::documentEvent).map(LightblueDocumentEventRepository::asEntity).peek(e -> {
            e.setProcessedDate(ZonedDateTime.now(this.clock));
            e.setStatus(DocumentEventEntity.Status.failed);
        }).collect(Collectors.toList());
        DataBulkRequest markDocumentEvents = new DataBulkRequest();
        markDocumentEvents.addAll(UpdateRequests.documentEventsStatusAndProcessedDate(processed));
        markDocumentEvents.addAll(UpdateRequests.documentEventsStatusAndProcessedDate(failed));
        if (markDocumentEvents.getRequests().isEmpty()) {
            return;
        }
        this.lightblue.bulkData(markDocumentEvents);
    }

    private String[] getSupportedAndEnabledEventTypes() {
        Set<String> canonicalTypesToProcess = this.config.getCanonicalTypesToProcess();
        if (canonicalTypesToProcess == null) {
            return new String[0];
        }
        if (canonicalTypesToProcess.containsAll(this.supportedTypes)) {
            return this.supportedTypesArray;
        }
        ArrayList<String> supportedAndEnabled = new ArrayList<String>(this.supportedTypes);
        supportedAndEnabled.retainAll(canonicalTypesToProcess);
        return supportedAndEnabled.toArray(new String[supportedAndEnabled.size()]);
    }

    private List<LightblueDocumentEvent> persistNewEventsAndStatusUpdatesToExisting(LockedResources<SharedIdentityEvents> identityLocks) throws LightblueException {
        LightblueBulkDataResponse bulkResponse;
        if (identityLocks.getLocks().isEmpty()) {
            return Collections.emptyList();
        }
        DataBulkRequest insertAndUpdateEvents = new DataBulkRequest();
        ArrayList<LightblueDocumentEvent> savedEvents = new ArrayList<LightblueDocumentEvent>();
        for (LockedResource<SharedIdentityEvents> identityLock : identityLocks.getLocks()) {
            try {
                identityLock.ensureAcquiredOrThrow("Won't update status or process event.");
            }
            catch (LostLockException e) {
                logger.warn("Lost lock. This is not fatal. See exception for details.", (Throwable)e);
                continue;
            }
            SharedIdentityEvents lockedEvents = identityLock.getResource();
            for (DocumentEventUpdate update : lockedEvents.updates.values()) {
                LightblueDocumentEvent event = update.event;
                DocumentEventEntity entity = event.wrappedDocumentEventEntity();
                if (entity.get_id() == null) {
                    if (!entity.getStatus().equals((Object)DocumentEventEntity.Status.processing)) continue;
                    insertAndUpdateEvents.add((CRUDRequest)InsertRequests.documentEventsReturningOnlyIds(entity));
                    savedEvents.add(event);
                    continue;
                }
                insertAndUpdateEvents.add((CRUDRequest)UpdateRequests.documentEventStatusDatesAndSurvivorOfIfCurrent(entity, update.originalProcessingDate));
                savedEvents.add(event);
            }
        }
        try {
            bulkResponse = this.lightblue.bulkData(insertAndUpdateEvents);
        }
        catch (LightblueBulkResponseException e) {
            bulkResponse = e.getBulkResponse();
        }
        Iterator eventsIterator = savedEvents.iterator();
        Iterator responsesIterator = bulkResponse.getResponses().iterator();
        while (eventsIterator.hasNext()) {
            if (!responsesIterator.hasNext()) {
                throw new IllegalStateException("Mismatched number of requests and responses! Events looked like: <{}>. Responses looked like");
            }
            LightblueDataResponse response = (LightblueDataResponse)responsesIterator.next();
            LightblueDocumentEvent event = (LightblueDocumentEvent)eventsIterator.next();
            DocumentEventEntity entity = event.wrappedDocumentEventEntity();
            if (LightblueErrors.arePresentInResponse((LightblueResponse)response)) {
                if (logger.isWarnEnabled()) {
                    List<String> errorStrings = LightblueErrors.toStringsFromErrorResponse((LightblueResponse)response);
                    logger.warn("Event update failed. Will not process. Event was: <{}>. Errors: <{}>", (Object)event, errorStrings);
                }
                eventsIterator.remove();
                continue;
            }
            if (response.parseModifiedCount() == 0) {
                logger.warn("Event updated by another thread. Will not process. Document event id: {}", (Object)event.wrappedDocumentEventEntity().get_id());
                eventsIterator.remove();
                continue;
            }
            if (!entity.getStatus().equals((Object)DocumentEventEntity.Status.processing)) {
                eventsIterator.remove();
                continue;
            }
            if (entity.get_id() != null) continue;
            DocumentEventEntity processed = (DocumentEventEntity)response.parseProcessed(DocumentEventEntity.class);
            entity.set_id(processed.get_id());
        }
        return savedEvents;
    }

    private static DocumentEventEntity asEntity(DocumentEvent event) {
        if (event instanceof LightblueDocumentEvent) {
            return ((LightblueDocumentEvent)event).wrappedDocumentEventEntity();
        }
        throw new IllegalArgumentException("Unknown event type. Only LightblueDocumentEvent is supported. Event type was: " + event.getClass());
    }

    static String identify(DocumentEventEntity eventEntity) {
        return eventEntity.get_id() != null ? "<id=" + eventEntity.get_id() + ">" : "<survivorOfIds=" + eventEntity.getSurvivorOfIds() + ">";
    }

    static class DocumentEventUpdate {
        @Nullable
        final ZonedDateTime originalProcessingDate;
        final LightblueDocumentEvent event;

        static DocumentEventUpdate timestamp(LightblueDocumentEvent event, Clock clock) {
            DocumentEventEntity entity = event.wrappedDocumentEventEntity();
            DocumentEventEntity.Status currentStatus = entity.getStatus();
            ZonedDateTime originalProcessingDate = entity.getProcessingDate();
            ZonedDateTime now = ZonedDateTime.now(clock);
            entity.setProcessingDate(now);
            if (DocumentEventEntity.Status.superseded.equals((Object)currentStatus) || DocumentEventEntity.Status.merged.equals((Object)currentStatus) || DocumentEventEntity.Status.published.equals((Object)currentStatus)) {
                entity.setProcessedDate(now);
            }
            return new DocumentEventUpdate(originalProcessingDate, event);
        }

        private DocumentEventUpdate(@Nullable ZonedDateTime originalProcessingDate, LightblueDocumentEvent event) {
            this.originalProcessingDate = originalProcessingDate;
            this.event = event;
        }
    }

    static class SharedIdentityEvents
    implements Lockable {
        final Identity identity;
        final Map<LightblueDocumentEvent, DocumentEventUpdate> updates = new IdentityHashMap<LightblueDocumentEvent, DocumentEventUpdate>();
        private final Optional<LockedResource<SharedIdentityEvents>> lock;
        private final List<LightblueDocumentEvent> optimized = new ArrayList<LightblueDocumentEvent>();
        private final Clock clock;

        static LockedResources<SharedIdentityEvents> parseAndOptimizeLockableDocumentEventEntities(int maxIdentities, DocumentEventEntity[] entities, LightblueRequester requester, Map<String, ? extends DocumentEventFactory> documentEventFactoriesByType, LockStrategy lockStrategy, Clock clock) {
            HashMap<Identity, SharedIdentityEvents> docEventsByIdentity = new HashMap<Identity, SharedIdentityEvents>();
            ArrayList locksAcquired = new ArrayList();
            for (DocumentEventEntity eventEntity : entities) {
                LightblueDocumentEvent newEvent;
                String typeOfEvent = eventEntity.getCanonicalType();
                DocumentEventFactory eventFactoryForType = documentEventFactoriesByType.get(typeOfEvent);
                try {
                    newEvent = eventFactoryForType.getDocumentEventForEntity(eventEntity, requester);
                }
                catch (Exception e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Failed to parse event entity: " + eventEntity, (Throwable)e);
                    }
                    newEvent = new UnparseableDocumentEvent(e, eventEntity);
                }
                Identity identity = newEvent.identity();
                SharedIdentityEvents eventBatch = (SharedIdentityEvents)docEventsByIdentity.get(identity);
                if (eventBatch == null) {
                    if (locksAcquired.size() == maxIdentities) continue;
                    eventBatch = new SharedIdentityEvents(lockStrategy, identity, clock);
                    docEventsByIdentity.put(identity, eventBatch);
                    if (eventBatch.lock.isPresent()) {
                        locksAcquired.add(eventBatch.lock.get());
                        logger.debug("Acquired lock for resource {}", (Object)eventBatch.getResourceId());
                    }
                }
                eventBatch.addEvent(newEvent);
            }
            return LockedResources.fromLocks(locksAcquired);
        }

        SharedIdentityEvents(LockStrategy lockStrategy, Identity identity, Clock clock) {
            Optional<LockedResource<SharedIdentityEvents>> lock;
            this.identity = identity;
            this.clock = clock;
            try {
                lock = Optional.of(lockStrategy.tryAcquire(this));
            }
            catch (LockNotAvailableException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Lock not available. This is not fatal. Assuming another thread is processing document events sharing identity: " + identity, (Throwable)e);
                }
                lock = Optional.empty();
            }
            this.lock = lock;
        }

        @Override
        public String getResourceId() {
            return this.identity.getResourceId();
        }

        public String toString() {
            return "SharedIdentityEvents{identity=" + this.identity + ", updates=" + this.updates + ", optimized=" + this.optimized + '}';
        }

        private void addEvent(LightblueDocumentEvent event) {
            if (!Objects.equals(event.identity(), this.identity)) {
                throw new IllegalArgumentException("Tried to add event to shared identity batch that didn't share the same identity.");
            }
            if (!this.lock.isPresent()) {
                return;
            }
            LightblueDocumentEvent newOrMergerEvent = event;
            DocumentEventEntity newOrMergerEventEntity = event.wrappedDocumentEventEntity();
            Iterator<LightblueDocumentEvent> optimizedIterator = this.optimized.iterator();
            while (optimizedIterator.hasNext()) {
                DocumentEventEntity previousEntity;
                LightblueDocumentEvent previouslyOptimizedEvent = optimizedIterator.next();
                if (newOrMergerEvent.isSupersededBy(previouslyOptimizedEvent)) {
                    previousEntity = previouslyOptimizedEvent.wrappedDocumentEventEntity();
                    previousEntity.addSurvivorOfIds(newOrMergerEventEntity.getSurvivorOfIds());
                    previousEntity.addSurvivorOfIds(newOrMergerEventEntity.get_id());
                    if (newOrMergerEventEntity.get_id() != null) {
                        newOrMergerEventEntity.setStatus(DocumentEventEntity.Status.superseded);
                        this.updates.put(newOrMergerEvent, DocumentEventUpdate.timestamp(newOrMergerEvent, this.clock));
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug("Event {} superseded by event {}", (Object)LightblueDocumentEventRepository.identify(newOrMergerEventEntity), (Object)LightblueDocumentEventRepository.identify(previousEntity));
                    }
                    newOrMergerEventEntity.setSurvivorOfIds(null);
                    newOrMergerEvent = null;
                    break;
                }
                if (previouslyOptimizedEvent.isSupersededBy(newOrMergerEvent)) {
                    previousEntity = previouslyOptimizedEvent.wrappedDocumentEventEntity();
                    if (previousEntity.get_id() == null) {
                        this.updates.remove(previouslyOptimizedEvent);
                    } else {
                        previousEntity.setStatus(DocumentEventEntity.Status.superseded);
                        previousEntity.setProcessedDate(ZonedDateTime.now(this.clock));
                    }
                    optimizedIterator.remove();
                    newOrMergerEventEntity.addSurvivorOfIds(previousEntity.get_id());
                    newOrMergerEventEntity.addSurvivorOfIds(previousEntity.getSurvivorOfIds());
                    if (logger.isDebugEnabled()) {
                        logger.debug("Event {} superseded by event {}", (Object)LightblueDocumentEventRepository.identify(previousEntity), (Object)LightblueDocumentEventRepository.identify(newOrMergerEventEntity));
                    }
                    previousEntity.setSurvivorOfIds(null);
                    continue;
                }
                if (!newOrMergerEvent.couldMergeWith(previouslyOptimizedEvent)) continue;
                previousEntity = previouslyOptimizedEvent.wrappedDocumentEventEntity();
                if (previousEntity.get_id() == null) {
                    this.updates.remove(previouslyOptimizedEvent);
                } else {
                    previousEntity.setStatus(DocumentEventEntity.Status.merged);
                    previousEntity.setProcessedDate(ZonedDateTime.now(this.clock));
                }
                optimizedIterator.remove();
                newOrMergerEventEntity.setStatus(DocumentEventEntity.Status.merged);
                if (newOrMergerEventEntity.get_id() != null) {
                    this.updates.put(newOrMergerEvent, DocumentEventUpdate.timestamp(newOrMergerEvent, this.clock));
                }
                LightblueDocumentEvent merger = newOrMergerEvent.merge(previouslyOptimizedEvent);
                DocumentEventEntity mergerEntity = merger.wrappedDocumentEventEntity();
                mergerEntity.addSurvivorOfIds(previousEntity.getSurvivorOfIds());
                mergerEntity.addSurvivorOfIds(newOrMergerEventEntity.getSurvivorOfIds());
                if (previousEntity.get_id() != null) {
                    mergerEntity.addSurvivorOfIds(previousEntity.get_id());
                }
                if (newOrMergerEventEntity.get_id() != null) {
                    mergerEntity.addSurvivorOfIds(newOrMergerEventEntity.get_id());
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("Events {} and {} merged into new event which now merges all of {}", new Object[]{LightblueDocumentEventRepository.identify(previousEntity), LightblueDocumentEventRepository.identify(newOrMergerEventEntity), LightblueDocumentEventRepository.identify(mergerEntity)});
                }
                newOrMergerEventEntity.setSurvivorOfIds(null);
                previousEntity.setSurvivorOfIds(null);
                newOrMergerEvent = merger;
                newOrMergerEventEntity = mergerEntity;
            }
            if (newOrMergerEvent != null) {
                newOrMergerEventEntity.setStatus(DocumentEventEntity.Status.processing);
                this.optimized.add(newOrMergerEvent);
                this.updates.put(newOrMergerEvent, DocumentEventUpdate.timestamp(newOrMergerEvent, this.clock));
            }
        }
    }
}

