/*
 * Decompiled with CFR 0.152.
 */
package cz.o2.proxima.direct.transaction;

import cz.o2.proxima.annotations.DeclaredThreadSafe;
import cz.o2.proxima.direct.commitlog.CommitLogObserver;
import cz.o2.proxima.direct.commitlog.CommitLogObservers;
import cz.o2.proxima.direct.commitlog.CommitLogReader;
import cz.o2.proxima.direct.commitlog.ObserveHandle;
import cz.o2.proxima.direct.commitlog.ObserveHandleUtils;
import cz.o2.proxima.direct.core.AttributeWriterBase;
import cz.o2.proxima.direct.core.CommitCallback;
import cz.o2.proxima.direct.core.DirectAttributeFamilyDescriptor;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.direct.randomaccess.KeyValue;
import cz.o2.proxima.direct.transaction.ClientTransactionManager;
import cz.o2.proxima.direct.transaction.ServerTransactionManager;
import cz.o2.proxima.direct.transaction.ThreadPooledObserver;
import cz.o2.proxima.direct.view.CachedView;
import cz.o2.proxima.functional.BiConsumer;
import cz.o2.proxima.internal.shaded.com.google.common.annotations.VisibleForTesting;
import cz.o2.proxima.internal.shaded.com.google.common.base.Preconditions;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Lists;
import cz.o2.proxima.internal.shaded.com.google.common.collect.Sets;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.AttributeFamilyDescriptor;
import cz.o2.proxima.repository.EntityAwareAttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.TransactionMode;
import cz.o2.proxima.storage.Partition;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.transaction.Commit;
import cz.o2.proxima.transaction.KeyAttribute;
import cz.o2.proxima.transaction.Request;
import cz.o2.proxima.transaction.Response;
import cz.o2.proxima.transaction.State;
import cz.o2.proxima.util.Classpath;
import cz.o2.proxima.util.ExceptionUtils;
import cz.o2.proxima.util.Optionals;
import cz.o2.proxima.util.Pair;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class TransactionResourceManager
implements ClientTransactionManager,
ServerTransactionManager {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TransactionResourceManager.class);
    private final DirectDataOperator direct;
    private final EntityDescriptor transaction;
    private final EntityAwareAttributeDescriptor.Wildcard<Request> requestDesc;
    private final EntityAwareAttributeDescriptor.Wildcard<Response> responseDesc;
    private final EntityAwareAttributeDescriptor.Regular<State> stateDesc;
    private final EntityAwareAttributeDescriptor.Regular<Commit> commitDesc;
    private final Map<String, CachedTransaction> openTransactionMap = new ConcurrentHashMap<String, CachedTransaction>();
    private final Map<AttributeFamilyDescriptor, CachedWriters> cachedAccessors = new ConcurrentHashMap<AttributeFamilyDescriptor, CachedWriters>();
    private final Map<DirectAttributeFamilyDescriptor, ObserveHandle> serverObservedFamilies = new ConcurrentHashMap<DirectAttributeFamilyDescriptor, ObserveHandle>();
    private final Map<DirectAttributeFamilyDescriptor, HandleWithAssignment> clientObservedFamilies = new ConcurrentHashMap<DirectAttributeFamilyDescriptor, HandleWithAssignment>();
    private final Map<DirectAttributeFamilyDescriptor, CachedView> stateViews = new ConcurrentHashMap<DirectAttributeFamilyDescriptor, CachedView>();
    private final Map<String, BiConsumer<String, Response>> transactionResponseConsumers = new ConcurrentHashMap<String, BiConsumer<String, Response>>();
    private final TransactionConfig cfg = new TransactionConfig();
    private final Map<AttributeFamilyDescriptor, AtomicBoolean> activeForFamily = new ConcurrentHashMap<AttributeFamilyDescriptor, AtomicBoolean>();
    private long transactionTimeoutMs;
    private long cleanupIntervalMs;
    private ServerTransactionManager.InitialSequenceIdPolicy initialSequenceIdPolicy;

    @VisibleForTesting
    static TransactionResourceManager create(DirectDataOperator direct) {
        return new TransactionResourceManager(direct, Collections.emptyMap());
    }

    @VisibleForTesting
    public TransactionResourceManager(DirectDataOperator direct, Map<String, Object> cfg) {
        this.direct = direct;
        this.transaction = direct.getRepository().getEntity("_transaction");
        this.requestDesc = EntityAwareAttributeDescriptor.Wildcard.of((EntityDescriptor)this.transaction, (AttributeDescriptor)this.transaction.getAttribute("request.*"));
        this.responseDesc = EntityAwareAttributeDescriptor.Wildcard.of((EntityDescriptor)this.transaction, (AttributeDescriptor)this.transaction.getAttribute("response.*"));
        this.stateDesc = EntityAwareAttributeDescriptor.Regular.of((EntityDescriptor)this.transaction, (AttributeDescriptor)this.transaction.getAttribute("state"));
        this.commitDesc = EntityAwareAttributeDescriptor.Regular.of((EntityDescriptor)this.transaction, (AttributeDescriptor)this.transaction.getAttribute("commit"));
        this.transactionTimeoutMs = TransactionResourceManager.getTransactionTimeout(cfg);
        this.cleanupIntervalMs = TransactionResourceManager.getCleanupInterval(cfg);
        this.initialSequenceIdPolicy = TransactionResourceManager.getInitialSequenceIdPolicy(cfg);
        log.info("Created {} with transaction timeout {} ms", (Object)this.getClass().getSimpleName(), (Object)this.transactionTimeoutMs);
    }

    @VisibleForTesting
    public void setTransactionTimeoutMs(long timeoutMs) {
        this.transactionTimeoutMs = timeoutMs;
    }

    private static long getTransactionTimeout(Map<String, Object> cfg) {
        return Optional.ofNullable(cfg.get("timeout")).map(Object::toString).map(Long::parseLong).orElse(3600000L);
    }

    private static long getCleanupInterval(Map<String, Object> cfg) {
        return Optional.ofNullable(cfg.get("cleanup-interval")).map(Object::toString).map(Long::parseLong).orElse(3600000L);
    }

    private static ServerTransactionManager.InitialSequenceIdPolicy getInitialSequenceIdPolicy(Map<String, Object> cfg) {
        return Optional.ofNullable(cfg.get("initial-sequence-id-policy")).map(Object::toString).map(c -> (ServerTransactionManager.InitialSequenceIdPolicy)Classpath.newInstance((String)c, ServerTransactionManager.InitialSequenceIdPolicy.class)).orElse(new ServerTransactionManager.InitialSequenceIdPolicy.Default());
    }

    @Override
    public void close() {
        this.openTransactionMap.forEach((k, v) -> v.close());
        this.cachedAccessors.forEach((k, v) -> v.close());
        this.stateViews.forEach((k, v) -> v.close());
        this.openTransactionMap.clear();
        this.cachedAccessors.clear();
        this.stateViews.clear();
        this.transactionResponseConsumers.clear();
        this.serverObservedFamilies.values().forEach(ObserveHandle::close);
        this.serverObservedFamilies.clear();
        this.clientObservedFamilies.values().forEach(p -> p.getObserveHandle().close());
        this.clientObservedFamilies.clear();
    }

    @Override
    public void houseKeeping() {
        long now = System.currentTimeMillis();
        long releaseTime = now - this.cleanupIntervalMs;
        this.openTransactionMap.entrySet().stream().filter(e -> ((CachedTransaction)e.getValue()).getCreated() < releaseTime).map(Map.Entry::getKey).collect(Collectors.toList()).forEach(this::release);
    }

    @Override
    public void runObservations(String name, BiConsumer<StreamElement, Pair<Long, Object>> updateConsumer, CommitLogObserver requestObserver) {
        CommitLogObserver effectiveObserver = TransactionResourceManager.isNotThreadSafe(requestObserver) ? requestObserver : new ThreadPooledObserver(this.direct.getContext().getExecutorService(), requestObserver, TransactionResourceManager.getDeclaredParallelism(requestObserver).orElse(Runtime.getRuntime().availableProcessors()));
        List families = this.direct.getRepository().getAllEntities().filter(EntityDescriptor::isTransactional).flatMap(e -> e.getAllAttributes().stream()).filter(a -> a.getTransactionMode() != TransactionMode.NONE).map(AttributeDescriptor::getTransactionalManagerFamilies).map(Sets::newHashSet).distinct().collect(Collectors.toList());
        CountDownLatch initializedLatch = new CountDownLatch(families.size());
        families.stream().map(this::toRequestStatePair).forEach(p -> {
            DirectAttributeFamilyDescriptor requestFamily = (DirectAttributeFamilyDescriptor)p.getFirst();
            DirectAttributeFamilyDescriptor stateFamily = (DirectAttributeFamilyDescriptor)p.getSecond();
            String consumerName = name + "-" + requestFamily.getDesc().getName();
            log.info("Starting to observe family {} with URI {} and associated state family {} as {}", new Object[]{requestFamily, requestFamily.getDesc().getStorageUri(), stateFamily, consumerName});
            CommitLogReader reader = (CommitLogReader)Optionals.get(requestFamily.getCommitLogReader());
            CachedView view = this.stateViews.get(stateFamily);
            if (view == null) {
                view = (CachedView)Optionals.get(stateFamily.getCachedView());
                Duration ttl = Duration.ofMillis(this.cleanupIntervalMs);
                this.stateViews.put(stateFamily, view);
                view.assign(view.getPartitions(), updateConsumer, ttl);
            }
            initializedLatch.countDown();
            this.serverObservedFamilies.put(requestFamily, reader.observe(consumerName, this.repartitionHookForBeingActive(stateFamily, reader.getPartitions().size(), effectiveObserver)));
        });
        ExceptionUtils.unchecked(initializedLatch::await);
    }

    @VisibleForTesting
    static boolean isNotThreadSafe(CommitLogObserver requestObserver) {
        return requestObserver.getClass().getDeclaredAnnotation(DeclaredThreadSafe.class) == null;
    }

    @VisibleForTesting
    static Optional<Integer> getDeclaredParallelism(CommitLogObserver requestObserver) {
        return Arrays.stream(requestObserver.getClass().getAnnotations()).filter(a -> a.annotationType().equals(DeclaredThreadSafe.class)).map(DeclaredThreadSafe.class::cast).findAny().map(DeclaredThreadSafe::allowedParallelism).filter(i -> i != -1);
    }

    private Pair<DirectAttributeFamilyDescriptor, DirectAttributeFamilyDescriptor> toRequestStatePair(Collection<String> families) {
        List candidates = families.stream().map(this.direct::getFamilyByName).filter(af -> af.getAttributes().contains(this.requestDesc) || af.getAttributes().contains(this.stateDesc)).collect(Collectors.toList());
        Preconditions.checkState((candidates.size() < 3 && !candidates.isEmpty() ? 1 : 0) != 0);
        if (candidates.size() == 1) {
            return Pair.of(candidates.get(0), candidates.get(0));
        }
        return ((DirectAttributeFamilyDescriptor)candidates.get(0)).getAttributes().contains(this.requestDesc) ? Pair.of(candidates.get(0), candidates.get(1)) : Pair.of(candidates.get(1), candidates.get(0));
    }

    private CommitLogObserver repartitionHookForBeingActive(final DirectAttributeFamilyDescriptor stateFamily, final int numPartitions, CommitLogObserver delegate) {
        this.activeForFamily.putIfAbsent(stateFamily.getDesc(), new AtomicBoolean());
        return new CommitLogObservers.ForwardingObserver(delegate){

            @Override
            public boolean onNext(StreamElement ingest, CommitLogObserver.OnNextContext context) {
                Preconditions.checkArgument((boolean)((AtomicBoolean)TransactionResourceManager.this.activeForFamily.get(stateFamily.getDesc())).get());
                if (ingest.getStamp() > System.currentTimeMillis() - 2L * TransactionResourceManager.this.transactionTimeoutMs) {
                    super.onNext(ingest, context);
                } else {
                    log.warn("Skipping request {} due to timeout. Current timeout specified as {}", (Object)ingest, (Object)TransactionResourceManager.this.transactionTimeoutMs);
                    context.confirm();
                }
                return true;
            }

            @Override
            public void onRepartition(CommitLogObserver.OnRepartitionContext context) {
                Preconditions.checkArgument((context.partitions().isEmpty() || context.partitions().size() == numPartitions ? 1 : 0) != 0, (String)"At least all or none partitions need to be assigned to the consumer. Got %s partitions from %s", (int)context.partitions().size(), (int)numPartitions);
                if (!context.partitions().isEmpty()) {
                    TransactionResourceManager.this.transitionToActive(stateFamily);
                } else {
                    TransactionResourceManager.this.transitionToInactive(stateFamily);
                }
                super.onRepartition(context);
            }
        };
    }

    private void transitionToActive(DirectAttributeFamilyDescriptor desc) {
        boolean isAtHead = false;
        while (!Thread.currentThread().isInterrupted() && !isAtHead) {
            CachedView view = Objects.requireNonNull(this.stateViews.get(desc));
            CommitLogReader reader = view.getUnderlyingReader();
            Optional<ObserveHandle> handle = view.getRunningHandle();
            if (handle.isPresent()) {
                isAtHead = ObserveHandleUtils.isAtHead(handle.get(), reader);
            }
            if (isAtHead) continue;
            ExceptionUtils.ignoringInterrupted((ExceptionUtils.ThrowingRunnable & Serializable)() -> TimeUnit.MILLISECONDS.sleep(100L));
        }
        if (isAtHead) {
            log.info("Transitioned to ACTIVE state for {}", (Object)desc);
            this.activeForFamily.get(desc.getDesc()).set(true);
        }
    }

    private void transitionToInactive(DirectAttributeFamilyDescriptor desc) {
        log.info("Transitioning to INACTIVE state for {}", (Object)desc);
        this.activeForFamily.get(desc.getDesc()).set(false);
    }

    private void addTransactionResponseConsumer(String transactionId, DirectAttributeFamilyDescriptor responseFamily, BiConsumer<String, Response> responseConsumer) {
        Preconditions.checkArgument((boolean)responseFamily.getAttributes().contains(this.responseDesc));
        this.transactionResponseConsumers.put(transactionId, responseConsumer);
        this.clientObservedFamilies.computeIfAbsent(responseFamily, k -> {
            log.debug("Starting to observe family {} with URI {}", k, (Object)k.getDesc().getStorageUri());
            HandleWithAssignment assignment = new HandleWithAssignment();
            assignment.withHandle(((CommitLogReader)Optionals.get(k.getCommitLogReader())).observe(this.newResponseObserverNameFor((DirectAttributeFamilyDescriptor)k), this.newTransactionResponseObserver(assignment)));
            return assignment;
        });
    }

    protected String newResponseObserverNameFor(DirectAttributeFamilyDescriptor k) {
        String localhost = "localhost";
        try {
            localhost = InetAddress.getLocalHost().getHostAddress();
        }
        catch (UnknownHostException e) {
            log.warn("Error getting name of localhost, using {} instead.", (Object)localhost, (Object)e);
        }
        return "transaction-response-observer-" + k.getDesc().getName() + (localhost.hashCode() & Integer.MAX_VALUE);
    }

    private CommitLogObserver newTransactionResponseObserver(final HandleWithAssignment assignment) {
        Preconditions.checkArgument((assignment != null ? 1 : 0) != 0);
        return new CommitLogObserver(){

            @Override
            public boolean onNext(StreamElement ingest, CommitLogObserver.OnNextContext context) {
                log.debug("Received transaction event {}", (Object)ingest);
                if (ingest.getAttributeDescriptor().equals(TransactionResourceManager.this.responseDesc)) {
                    String transactionId = ingest.getKey();
                    Optional response = TransactionResourceManager.this.responseDesc.valueOf(ingest);
                    BiConsumer consumer = (BiConsumer)TransactionResourceManager.this.transactionResponseConsumers.get(transactionId);
                    if (consumer != null) {
                        if (response.isPresent()) {
                            String suffix = TransactionResourceManager.this.responseDesc.extractSuffix(ingest.getAttribute());
                            consumer.accept((Object)suffix, response.get());
                        } else {
                            log.error("Failed to parse response from {}", (Object)ingest);
                        }
                    } else {
                        log.debug("Missing consumer for transaction {} processing response {}. Ignoring.", (Object)transactionId, response.orElse(null));
                    }
                }
                context.confirm();
                return true;
            }

            @Override
            public void onRepartition(CommitLogObserver.OnRepartitionContext context) {
                assignment.assign(context.partitions());
            }
        };
    }

    private synchronized CachedWriters getCachedAccessors(DirectAttributeFamilyDescriptor family) {
        return this.cachedAccessors.computeIfAbsent(family.getDesc(), k -> new CachedWriters(family));
    }

    @Override
    public void begin(String transactionId, BiConsumer<String, Response> responseConsumer, List<KeyAttribute> attributes) {
        CachedTransaction cachedTransaction = this.openTransactionMap.computeIfAbsent(transactionId, k -> new CachedTransaction(transactionId, attributes, responseConsumer));
        cachedTransaction.open(attributes);
    }

    @Override
    public void updateTransaction(String transactionId, List<KeyAttribute> newAttributes) {
        CachedTransaction cachedTransaction = this.openTransactionMap.get(transactionId);
        Preconditions.checkArgument((cachedTransaction != null ? 1 : 0) != 0, (String)"Transaction %s is not open", (Object)transactionId);
        cachedTransaction.update(newAttributes);
    }

    @Override
    public void commit(String transactionId, List<KeyAttribute> outputAttributes) {
        CachedTransaction cachedTransaction = this.openTransactionMap.get(transactionId);
        Preconditions.checkArgument((cachedTransaction != null ? 1 : 0) != 0, (String)"Transaction %s is not open", (Object)transactionId);
        cachedTransaction.commit(outputAttributes);
    }

    @Override
    public void rollback(String transactionId) {
        Optional.ofNullable(this.openTransactionMap.get(transactionId)).ifPresent(CachedTransaction::rollback);
    }

    @Override
    public void release(String transactionId) {
        Optional.ofNullable(this.openTransactionMap.remove(transactionId)).ifPresent(CachedTransaction::close);
        this.transactionResponseConsumers.remove(transactionId);
    }

    @Override
    public State getCurrentState(String transactionId) {
        CachedTransaction cachedTransaction = this.openTransactionMap.get(transactionId);
        if (cachedTransaction == null) {
            return State.empty();
        }
        return cachedTransaction.getState();
    }

    @Override
    public void ensureTransactionOpen(String transactionId, State state) {
        this.getOrCreateCachedTransaction(transactionId, state);
    }

    @VisibleForTesting
    CachedTransaction createCachedTransaction(String transactionId, State state) {
        return this.createCachedTransaction(transactionId, state, null);
    }

    @VisibleForTesting
    CachedTransaction createCachedTransaction(String transactionId, State state, BiConsumer<String, Response> responseConsumer) {
        Collection attributes;
        if (!state.getCommittedAttributes().isEmpty()) {
            HashSet committedSet = Sets.newHashSet((Iterable)state.getCommittedAttributes());
            committedSet.addAll(state.getInputAttributes());
            attributes = committedSet;
        } else {
            attributes = state.getInputAttributes();
        }
        return new CachedTransaction(transactionId, attributes, responseConsumer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void writeResponseAndUpdateState(String transactionId, State updateState, String responseId, Response response, CommitCallback callback) {
        CachedTransaction cachedTransaction = this.openTransactionMap.get(transactionId);
        if (cachedTransaction != null) {
            DirectAttributeFamilyDescriptor responseFamily = cachedTransaction.getResponseFamily();
            DirectAttributeFamilyDescriptor stateFamily = cachedTransaction.getStateFamily();
            OnlineAttributeWriter writer = cachedTransaction.getCommitWriter();
            CachedView stateView = cachedTransaction.getStateView();
            long now = System.currentTimeMillis();
            StreamElement stateUpsert = this.getStateDesc().upsert(transactionId, now, (Object)updateState);
            Commit commit = Commit.of(Arrays.asList(new Commit.TransactionUpdate(stateFamily.getDesc().getName(), stateUpsert), new Commit.TransactionUpdate(responseFamily.getDesc().getName(), this.getResponseDesc().upsert(transactionId, responseId, now, (Object)response))));
            OnlineAttributeWriter onlineAttributeWriter = stateView;
            synchronized (onlineAttributeWriter) {
                this.ensureTransactionOpen(transactionId, updateState);
                stateView.cache(stateUpsert);
            }
            onlineAttributeWriter = writer;
            synchronized (onlineAttributeWriter) {
                writer.write(this.getCommitDesc().upsert(transactionId, System.currentTimeMillis(), (Object)commit), callback);
            }
        }
        log.warn("Transaction {} is not open, don't have a writer to return response {}", (Object)transactionId, (Object)response);
        callback.commit(true, null);
    }

    private CachedTransaction getOrCreateCachedTransaction(String transactionId, State state) {
        return this.openTransactionMap.computeIfAbsent(transactionId, tmp -> this.createCachedTransaction(transactionId, state));
    }

    private Map<AttributeDescriptor<?>, DirectAttributeFamilyDescriptor> findFamilyForTransactionalAttribute(List<AttributeDescriptor<?>> attributes) {
        Preconditions.checkArgument((!attributes.isEmpty() ? 1 : 0) != 0, (Object)"Cannot return families for empty attribute list");
        TransactionMode mode = attributes.get(0).getTransactionMode();
        Preconditions.checkArgument((boolean)attributes.stream().allMatch(a -> a.getTransactionMode() == mode), (String)"All passed attributes must have the same transaction mode. Got attributes %s.", attributes);
        List candidates = attributes.stream().flatMap(a -> a.getTransactionalManagerFamilies().stream()).distinct().map(this.direct::findFamilyByName).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
        List requestResponseState = candidates.stream().flatMap(f -> f.getAttributes().stream().filter(a -> !a.equals(this.commitDesc))).sorted(Comparator.comparing(AttributeDescriptor::getName)).collect(Collectors.toList());
        Preconditions.checkState((boolean)requestResponseState.equals(Lists.newArrayList((Object[])new EntityAwareAttributeDescriptor[]{this.requestDesc, this.responseDesc, this.stateDesc})), (String)"Should have received only families for unique transactional attributes, got %s for %s with transactional mode %s", candidates, attributes, (Object)mode);
        Map<AttributeDescriptor<?>, DirectAttributeFamilyDescriptor> res = candidates.stream().flatMap(f -> f.getAttributes().stream().map(a -> Pair.of((Object)a, (Object)f))).collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
        this.direct.getRepository().getAllFamilies(true).filter(af -> af.getAttributes().contains(this.commitDesc)).forEach(af -> res.put((AttributeDescriptor<?>)this.commitDesc, this.direct.getFamilyByName(af.getName())));
        return res;
    }

    @Override
    @Generated
    public EntityDescriptor getTransaction() {
        return this.transaction;
    }

    @Override
    @Generated
    public EntityAwareAttributeDescriptor.Wildcard<Request> getRequestDesc() {
        return this.requestDesc;
    }

    @Override
    @Generated
    public EntityAwareAttributeDescriptor.Wildcard<Response> getResponseDesc() {
        return this.responseDesc;
    }

    @Override
    @Generated
    public EntityAwareAttributeDescriptor.Regular<State> getStateDesc() {
        return this.stateDesc;
    }

    @Override
    @Generated
    public EntityAwareAttributeDescriptor.Regular<Commit> getCommitDesc() {
        return this.commitDesc;
    }

    @Override
    @Generated
    public TransactionConfig getCfg() {
        return this.cfg;
    }

    @Generated
    long getTransactionTimeoutMs() {
        return this.transactionTimeoutMs;
    }

    @Generated
    private long getCleanupIntervalMs() {
        return this.cleanupIntervalMs;
    }

    @Generated
    ServerTransactionManager.InitialSequenceIdPolicy getInitialSequenceIdPolicy() {
        return this.initialSequenceIdPolicy;
    }

    private static class HandleWithAssignment {
        private final List<Integer> partitions = new ArrayList<Integer>();
        ObserveHandle observeHandle = null;

        private HandleWithAssignment() {
        }

        public void assign(Collection<Partition> partitions) {
            this.partitions.clear();
            this.partitions.addAll(partitions.stream().map(Partition::getId).collect(Collectors.toList()));
        }

        public void withHandle(ObserveHandle handle) {
            this.observeHandle = handle;
        }

        @Generated
        public List<Integer> getPartitions() {
            return this.partitions;
        }

        @Generated
        public ObserveHandle getObserveHandle() {
            return this.observeHandle;
        }
    }

    @VisibleForTesting
    class CachedTransaction
    implements AutoCloseable {
        final String transactionId;
        final long created = System.currentTimeMillis();
        final Map<AttributeDescriptor<?>, DirectAttributeFamilyDescriptor> attributeToFamily = new HashMap();
        @Nullable
        final BiConsumer<String, Response> responseConsumer;
        @Nullable
        OnlineAttributeWriter requestWriter;
        @Nullable
        OnlineAttributeWriter commitWriter;
        @Nullable
        CachedView stateView;
        int requestId = 1;

        CachedTransaction(String transactionId, @Nullable Collection<KeyAttribute> attributes, BiConsumer<String, Response> responseConsumer) {
            this.transactionId = transactionId;
            this.attributeToFamily.putAll(TransactionResourceManager.this.findFamilyForTransactionalAttribute(attributes.stream().map(KeyAttribute::getAttributeDescriptor).collect(Collectors.toList())));
            this.responseConsumer = responseConsumer;
        }

        void open(List<KeyAttribute> inputAttrs) {
            log.debug("Opening transaction {} with inputAttrs {}", (Object)this.transactionId, inputAttrs);
            Preconditions.checkState((this.responseConsumer != null ? 1 : 0) != 0);
            TransactionResourceManager.this.addTransactionResponseConsumer(this.transactionId, this.attributeToFamily.get(TransactionResourceManager.this.responseDesc), (BiConsumer<String, Response>)this.responseConsumer);
            this.sendRequest(Request.builder().flags(Request.Flags.OPEN).inputAttributes(inputAttrs).build(), "open." + this.requestId++);
        }

        public void commit(List<KeyAttribute> outputAttributes) {
            log.debug("Committing transaction {} with outputAttributes {}", (Object)this.transactionId, outputAttributes);
            this.sendRequest(Request.builder().flags(Request.Flags.COMMIT).outputAttributes(outputAttributes).build(), "commit");
        }

        public void update(List<KeyAttribute> addedAttributes) {
            log.debug("Updating transaction {} with addedAttributes {}", (Object)this.transactionId, addedAttributes);
            this.sendRequest(Request.builder().flags(Request.Flags.UPDATE).inputAttributes(addedAttributes).build(), "update." + this.requestId++);
        }

        public void rollback() {
            log.debug("Rolling back transaction {} (cached {})", (Object)this.transactionId, (Object)this);
            this.sendRequest(Request.builder().flags(Request.Flags.ROLLBACK).build(), "rollback");
        }

        private void sendRequest(Request request, String requestId) {
            CountDownLatch latch = new CountDownLatch(1);
            AtomicReference error = new AtomicReference();
            Pair<List<Integer>, OnlineAttributeWriter> writerWithAssignedPartitions = this.getRequestWriter();
            Preconditions.checkState((!((List)writerWithAssignedPartitions.getFirst()).isEmpty() ? 1 : 0) != 0, (Object)"Received empty partitions to observe for responses to transactional requests. Please see if you have enough partitions and if your clients can correctly resolve hostnames");
            ((OnlineAttributeWriter)writerWithAssignedPartitions.getSecond()).write(TransactionResourceManager.this.requestDesc.upsert(this.transactionId, requestId, System.currentTimeMillis(), (Object)request.withResponsePartitionId(this.pickOneAtRandom((List)writerWithAssignedPartitions.getFirst()))), (succ, exc) -> {
                error.set(exc);
                latch.countDown();
            });
            ExceptionUtils.ignoringInterrupted(latch::await);
            if (error.get() != null) {
                throw new IllegalStateException((Throwable)error.get());
            }
        }

        private int pickOneAtRandom(List<Integer> partitions) {
            return partitions.get(ThreadLocalRandom.current().nextInt(partitions.size()));
        }

        @Override
        public void close() {
            this.commitWriter = null;
            this.requestWriter = null;
            this.stateView = null;
        }

        Pair<List<Integer>, OnlineAttributeWriter> getRequestWriter() {
            if (this.requestWriter == null) {
                DirectAttributeFamilyDescriptor family = this.attributeToFamily.get(TransactionResourceManager.this.requestDesc);
                this.requestWriter = TransactionResourceManager.this.getCachedAccessors(family).getOrCreateWriter();
            }
            DirectAttributeFamilyDescriptor responseFamily = this.attributeToFamily.get(TransactionResourceManager.this.responseDesc);
            return Pair.of(Objects.requireNonNull(((HandleWithAssignment)TransactionResourceManager.this.clientObservedFamilies.get(responseFamily)).getPartitions()), (Object)this.requestWriter);
        }

        public DirectAttributeFamilyDescriptor getResponseFamily() {
            return Objects.requireNonNull(this.attributeToFamily.get(TransactionResourceManager.this.responseDesc));
        }

        public DirectAttributeFamilyDescriptor getStateFamily() {
            return Objects.requireNonNull(this.attributeToFamily.get(TransactionResourceManager.this.stateDesc));
        }

        public OnlineAttributeWriter getCommitWriter() {
            if (this.commitWriter == null) {
                DirectAttributeFamilyDescriptor family = this.attributeToFamily.get(TransactionResourceManager.this.getCommitDesc());
                this.commitWriter = TransactionResourceManager.this.getCachedAccessors(family).getOrCreateWriter();
            }
            return this.commitWriter;
        }

        CachedView getStateView() {
            if (this.stateView == null) {
                DirectAttributeFamilyDescriptor family = this.attributeToFamily.get(TransactionResourceManager.this.stateDesc);
                this.stateView = TransactionResourceManager.this.getCachedAccessors(family).getOrCreateStateView();
            }
            return this.stateView;
        }

        public State getState() {
            return this.getStateView().get(this.transactionId, TransactionResourceManager.this.stateDesc).map(KeyValue::getParsedRequired).orElse(State.empty());
        }

        @Generated
        public String getTransactionId() {
            return this.transactionId;
        }

        @Generated
        public long getCreated() {
            return this.created;
        }
    }

    private class CachedWriters
    implements AutoCloseable {
        private final DirectAttributeFamilyDescriptor family;
        @Nullable
        OnlineAttributeWriter writer;
        @Nullable
        CachedView stateView;

        CachedWriters(DirectAttributeFamilyDescriptor family) {
            this.family = family;
        }

        @Override
        public void close() {
            Optional.ofNullable(this.writer).ifPresent(AttributeWriterBase::close);
            Optional.ofNullable(this.stateView).ifPresent(CachedView::close);
        }

        public OnlineAttributeWriter getOrCreateWriter() {
            if (this.writer == null) {
                this.writer = ((AttributeWriterBase)Optionals.get(this.family.getWriter())).online();
            }
            return this.writer;
        }

        public CachedView getOrCreateStateView() {
            if (this.stateView == null) {
                this.stateView = (CachedView)TransactionResourceManager.this.stateViews.get(this.family);
                Preconditions.checkState((this.stateView != null ? 1 : 0) != 0, (String)"StateView not initialized for family %s. Initialized families are %s", (Object)this.family, TransactionResourceManager.this.stateViews.keySet());
            }
            return this.stateView;
        }
    }

    private class TransactionConfig
    implements ServerTransactionManager.ServerTransactionConfig {
        private TransactionConfig() {
        }

        @Override
        public long getCleanupInterval() {
            return TransactionResourceManager.this.cleanupIntervalMs;
        }

        @Override
        public ServerTransactionManager.InitialSequenceIdPolicy getInitialSeqIdPolicy() {
            return TransactionResourceManager.this.initialSequenceIdPolicy;
        }
    }
}

