/*
 * Decompiled with CFR 0.152.
 */
package io.deephaven.kafka.publish;

import io.deephaven.base.clock.Clock;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.util.BigDecimalUtils;
import io.deephaven.engine.util.string.StringUtils;
import io.deephaven.kafka.KafkaSchemaUtils;
import io.deephaven.kafka.publish.KeyOrValueSerializer;
import io.deephaven.time.DateTimeUtils;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.type.TypeUtils;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.MathContext;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;

public class GenericRecordKeyOrValueSerializer
implements KeyOrValueSerializer<GenericRecord> {
    private final Table source;
    private final Schema schema;
    protected final List<GenericRecordFieldProcessor> fieldProcessors = new ArrayList<GenericRecordFieldProcessor>();

    public GenericRecordKeyOrValueSerializer(Table source, Schema schema, String[] columnNames, String timestampFieldName, Properties columnProperties) {
        this.source = source;
        if (schema.isUnion()) {
            throw new UnsupportedOperationException("Schemas defined as a union of records are not supported");
        }
        Schema.Type type = schema.getType();
        if (type != Schema.Type.RECORD) {
            throw new IllegalArgumentException("The schema is not a toplevel record definition.");
        }
        boolean haveTimestampField = !StringUtils.isNullOrEmpty((String)timestampFieldName);
        TimeUnit timestampFieldUnit = null;
        List fields = schema.getFields();
        if (haveTimestampField) {
            timestampFieldUnit = GenericRecordKeyOrValueSerializer.checkTimestampFieldAndGetUnit(fields, timestampFieldName);
        }
        this.schema = schema;
        int i = 0;
        for (Schema.Field field : fields) {
            if (haveTimestampField && timestampFieldName.equals(field.name())) continue;
            this.makeFieldProcessor(field, columnNames == null ? null : columnNames[i], columnProperties);
            ++i;
        }
        if (haveTimestampField) {
            this.fieldProcessors.add(new TimestampFieldProcessor(timestampFieldName, timestampFieldUnit));
        }
    }

    private static TimeUnit checkTimestampFieldAndGetUnit(List<Schema.Field> fields, String timestampFieldName) {
        Schema.Field timestampField = null;
        for (Schema.Field field : fields) {
            if (!timestampFieldName.equals(field.name())) continue;
            timestampField = field;
            break;
        }
        if (timestampField == null) {
            throw new IllegalArgumentException("Field of name timestampFieldName='" + timestampFieldName + "' not found.");
        }
        Schema fieldSchema = timestampField.schema();
        Schema.Type type = fieldSchema.getType();
        if (type != Schema.Type.LONG) {
            throw new IllegalArgumentException("Field of name timestampFieldName='" + timestampFieldName + "' has wrong type " + type);
        }
        LogicalType logicalType = fieldSchema.getLogicalType();
        if (LogicalTypes.timestampMicros().equals(logicalType)) {
            return TimeUnit.MICROSECONDS;
        }
        if (LogicalTypes.timestampMillis().equals(logicalType)) {
            return TimeUnit.MILLISECONDS;
        }
        throw new IllegalStateException("Field of name timestampFieldName='" + timestampFieldName + "' has wrong logical type " + logicalType);
    }

    private static <ChunkType extends Chunk<Values>> GenericRecordFieldProcessor makeGenericFieldProcessor(String fieldName, ColumnSource<?> chunkSource, final GetFieldElementFun<ChunkType> fun) {
        return new GenericRecordFieldProcessorImpl<ChunkType>(fieldName, chunkSource){

            @Override
            Object getFieldElement(int ii, ChunkType inputChunk) {
                return fun.get(ii, inputChunk);
            }
        };
    }

    private static GenericRecordFieldProcessor makeByteFieldProcessor(String fieldName, ColumnSource<?> chunkSource) {
        return GenericRecordKeyOrValueSerializer.makeGenericFieldProcessor(fieldName, chunkSource, (ii, inputChunk) -> TypeUtils.box((byte)inputChunk.get(ii)));
    }

    private static GenericRecordFieldProcessor makeCharFieldProcessor(String fieldName, ColumnSource<?> chunkSource) {
        return GenericRecordKeyOrValueSerializer.makeGenericFieldProcessor(fieldName, chunkSource, (ii, inputChunk) -> TypeUtils.box((char)inputChunk.get(ii)));
    }

    private static GenericRecordFieldProcessor makeShortFieldProcessor(String fieldName, ColumnSource<?> chunkSource) {
        return GenericRecordKeyOrValueSerializer.makeGenericFieldProcessor(fieldName, chunkSource, (ii, inputChunk) -> TypeUtils.box((short)inputChunk.get(ii)));
    }

    private static GenericRecordFieldProcessor makeIntFieldProcessor(String fieldName, ColumnSource<?> chunkSource) {
        return GenericRecordKeyOrValueSerializer.makeGenericFieldProcessor(fieldName, chunkSource, (ii, inputChunk) -> TypeUtils.box((int)inputChunk.get(ii)));
    }

    private static GenericRecordFieldProcessor makeLongFieldProcessor(String fieldName, ColumnSource<?> chunkSource) {
        return GenericRecordKeyOrValueSerializer.makeGenericFieldProcessor(fieldName, chunkSource, (ii, inputChunk) -> TypeUtils.box((long)inputChunk.get(ii)));
    }

    private static GenericRecordFieldProcessor makeLongFieldProcessorWithInverseFactor(String fieldName, ColumnSource<?> chunkSource, long denominator) {
        return GenericRecordKeyOrValueSerializer.makeGenericFieldProcessor(fieldName, chunkSource, (ii, inputChunk) -> {
            long raw = inputChunk.get(ii);
            if (raw == Long.MIN_VALUE) {
                return null;
            }
            return raw / denominator;
        });
    }

    private static GenericRecordFieldProcessor makeFloatFieldProcessor(String fieldName, ColumnSource<?> chunkSource) {
        return GenericRecordKeyOrValueSerializer.makeGenericFieldProcessor(fieldName, chunkSource, (ii, inputChunk) -> TypeUtils.box((float)inputChunk.get(ii)));
    }

    private static GenericRecordFieldProcessor makeDoubleFieldProcessor(String fieldName, ColumnSource<?> chunkSource) {
        return GenericRecordKeyOrValueSerializer.makeGenericFieldProcessor(fieldName, chunkSource, (ii, inputChunk) -> TypeUtils.box((double)inputChunk.get(ii)));
    }

    private static GenericRecordFieldProcessor makeObjectFieldProcessor(String fieldName, ColumnSource<?> chunkSource) {
        return GenericRecordKeyOrValueSerializer.makeGenericFieldProcessor(fieldName, chunkSource, (ii, inputChunk) -> inputChunk.get(ii));
    }

    private static GenericRecordFieldProcessor makeInstantToMillisFieldProcessor(String fieldName, ColumnSource<?> chunkSource) {
        return GenericRecordKeyOrValueSerializer.makeGenericFieldProcessor(fieldName, chunkSource, (ii, inputChunk) -> {
            Instant val = (Instant)inputChunk.get(ii);
            return val == null ? null : Long.valueOf(val.toEpochMilli());
        });
    }

    private static GenericRecordFieldProcessor makeInstantToMicrosFieldProcessor(String fieldName, ColumnSource<?> chunkSource) {
        return GenericRecordKeyOrValueSerializer.makeGenericFieldProcessor(fieldName, chunkSource, (ii, inputChunk) -> {
            Instant val = (Instant)inputChunk.get(ii);
            return val == null ? null : Long.valueOf(DateTimeUtils.epochMicros((Instant)val));
        });
    }

    private static BigInteger toBigIntegerAtPrecisionAndScale(BigDecimal v, MathContext mathContext, int scale) {
        BigDecimal rescaled = v.scaleByPowerOfTen(scale).setScale(0, mathContext.getRoundingMode()).round(mathContext);
        return rescaled.toBigIntegerExact();
    }

    private static GenericRecordFieldProcessor makeBigDecimalFieldProcessor(String fieldName, ColumnSource<?> chunkSource, int precision, int scale) {
        MathContext mathContext = new MathContext(precision, RoundingMode.HALF_UP);
        return GenericRecordKeyOrValueSerializer.makeGenericFieldProcessor(fieldName, chunkSource, (ii, inputChunk) -> {
            BigDecimal bd = (BigDecimal)inputChunk.get(ii);
            BigInteger bi = GenericRecordKeyOrValueSerializer.toBigIntegerAtPrecisionAndScale(bd, mathContext, scale);
            return ByteBuffer.wrap(bi.toByteArray());
        });
    }

    private GenericRecordFieldProcessor getLongProcessor(Schema.Field field, String fieldName, Class<?> columnType, ColumnSource<?> src) {
        Schema fieldSchema = field.schema();
        if (columnType == Instant.class && fieldSchema.getType() == Schema.Type.LONG) {
            LogicalType logicalType = fieldSchema.getLogicalType();
            if (LogicalTypes.timestampMicros().equals(logicalType)) {
                return GenericRecordKeyOrValueSerializer.makeLongFieldProcessorWithInverseFactor(fieldName, src, 1000L);
            }
            if (LogicalTypes.timestampMillis().equals(logicalType)) {
                return GenericRecordKeyOrValueSerializer.makeLongFieldProcessorWithInverseFactor(fieldName, src, 1000000L);
            }
        }
        return GenericRecordKeyOrValueSerializer.makeLongFieldProcessor(fieldName, src);
    }

    final String getLogicalType(String fieldName, Schema.Field field) {
        Schema effectiveSchema = KafkaSchemaUtils.getEffectiveSchema(fieldName, field.schema());
        return effectiveSchema.getProp("logicalType");
    }

    private void makeFieldProcessor(Schema.Field field, String columnNameIn, Properties columnProperties) {
        String fieldName = field.name();
        String columnName = columnNameIn == null ? fieldName : columnNameIn;
        ColumnSource src = this.source.getColumnSource(columnName);
        Class type = src.getType();
        GenericRecordFieldProcessor proc = this.getFieldProcessorForType(type, field, fieldName, columnName, src, columnProperties);
        this.fieldProcessors.add(proc);
    }

    private GenericRecordFieldProcessor getFieldProcessorForType(Class<?> type, Schema.Field field, String fieldName, String columnName, ColumnSource<?> src, Properties columnProperties) {
        if (type == Character.TYPE) {
            return GenericRecordKeyOrValueSerializer.makeCharFieldProcessor(fieldName, src);
        }
        if (type == Byte.TYPE) {
            return GenericRecordKeyOrValueSerializer.makeByteFieldProcessor(fieldName, src);
        }
        if (type == Short.TYPE) {
            return GenericRecordKeyOrValueSerializer.makeShortFieldProcessor(fieldName, src);
        }
        if (type == Integer.TYPE) {
            return GenericRecordKeyOrValueSerializer.makeIntFieldProcessor(fieldName, src);
        }
        if (type == Long.TYPE) {
            return this.getLongProcessor(field, fieldName, this.source.getDefinition().getColumn(columnName).getDataType(), src);
        }
        if (type == Float.TYPE) {
            return GenericRecordKeyOrValueSerializer.makeFloatFieldProcessor(fieldName, src);
        }
        if (type == Double.TYPE) {
            return GenericRecordKeyOrValueSerializer.makeDoubleFieldProcessor(fieldName, src);
        }
        if (type == Instant.class) {
            String logicalType = this.getLogicalType(fieldName, field);
            if (logicalType == null) {
                throw new IllegalArgumentException("field " + fieldName + " for column " + columnName + " has no logical type.");
            }
            if (logicalType.equals("timestamp-millis")) {
                return GenericRecordKeyOrValueSerializer.makeInstantToMillisFieldProcessor(fieldName, src);
            }
            if (logicalType.equals("timestamp-micros")) {
                return GenericRecordKeyOrValueSerializer.makeInstantToMicrosFieldProcessor(fieldName, src);
            }
            throw new IllegalArgumentException("field " + fieldName + " for column " + columnName + " has unrecognized logical type " + logicalType);
        }
        if (type == BigDecimal.class) {
            BigDecimalUtils.PropertyNames propertyNames = new BigDecimalUtils.PropertyNames(columnName);
            BigDecimalUtils.PrecisionAndScale precisionAndScale = BigDecimalUtils.getPrecisionAndScaleFromColumnProperties((BigDecimalUtils.PropertyNames)propertyNames, (Properties)columnProperties, (boolean)true);
            return GenericRecordKeyOrValueSerializer.makeBigDecimalFieldProcessor(fieldName, src, precisionAndScale.precision, precisionAndScale.scale);
        }
        return GenericRecordKeyOrValueSerializer.makeObjectFieldProcessor(fieldName, src);
    }

    @Override
    public ObjectChunk<GenericRecord, Values> handleChunk(KeyOrValueSerializer.Context context, RowSequence toProcess, boolean previous) {
        AvroContext avroContext = (AvroContext)context;
        avroContext.avroChunk.setSize(toProcess.intSize());
        for (int position = 0; position < toProcess.intSize(); ++position) {
            avroContext.avroChunk.set(position, (Object)new GenericData.Record(this.schema));
        }
        for (int ii = 0; ii < this.fieldProcessors.size(); ++ii) {
            this.fieldProcessors.get(ii).processField(avroContext.fieldContexts[ii], avroContext.avroChunk, toProcess, previous);
        }
        return avroContext.avroChunk;
    }

    @Override
    public KeyOrValueSerializer.Context makeContext(int size) {
        return new AvroContext(size);
    }

    private final class AvroContext
    implements KeyOrValueSerializer.Context {
        private final WritableObjectChunk<GenericRecord, Values> avroChunk;
        private final FieldContext[] fieldContexts;

        public AvroContext(int size) {
            this.avroChunk = WritableObjectChunk.makeWritableChunk((int)size);
            this.fieldContexts = new FieldContext[GenericRecordKeyOrValueSerializer.this.fieldProcessors.size()];
            for (int ii = 0; ii < GenericRecordKeyOrValueSerializer.this.fieldProcessors.size(); ++ii) {
                this.fieldContexts[ii] = GenericRecordKeyOrValueSerializer.this.fieldProcessors.get(ii).makeContext(size);
            }
        }

        public void close() {
            this.avroChunk.close();
            SafeCloseable.closeAll((AutoCloseable[])this.fieldContexts);
        }
    }

    private static class TimestampFieldProcessor
    extends GenericRecordFieldProcessor {
        private final TimeUnit unit;

        public TimestampFieldProcessor(String fieldName, TimeUnit unit) {
            super(fieldName);
            switch (unit) {
                case MICROSECONDS: 
                case MILLISECONDS: {
                    this.unit = unit;
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unit not supported: " + unit);
                }
            }
        }

        @Override
        FieldContext makeContext(int size) {
            return null;
        }

        @Override
        public void processField(FieldContext fieldContext, WritableObjectChunk<GenericRecord, Values> avroChunk, RowSequence keys, boolean isRemoval) {
            long unitTime;
            switch (this.unit) {
                case MICROSECONDS: {
                    unitTime = Clock.system().currentTimeMicros();
                    break;
                }
                case MILLISECONDS: {
                    unitTime = Clock.system().currentTimeMillis();
                    break;
                }
                default: {
                    throw new IllegalStateException();
                }
            }
            for (int ii = 0; ii < avroChunk.size(); ++ii) {
                ((GenericRecord)avroChunk.get(ii)).put(this.fieldName, (Object)unitTime);
            }
        }
    }

    @FunctionalInterface
    private static interface GetFieldElementFun<ChunkType extends Chunk<Values>> {
        public Object get(int var1, ChunkType var2);
    }

    private static abstract class GenericRecordFieldProcessorImpl<ChunkType extends Chunk<Values>>
    extends GenericRecordFieldProcessor {
        private final ColumnSource<?> chunkSource;

        public GenericRecordFieldProcessorImpl(String fieldName, ColumnSource<?> chunkSource) {
            super(fieldName);
            this.chunkSource = chunkSource;
        }

        @Override
        FieldContext makeContext(int size) {
            return new ContextImpl(size);
        }

        abstract Object getFieldElement(int var1, ChunkType var2);

        @Override
        void processField(FieldContext fieldContext, WritableObjectChunk<GenericRecord, Values> avroChunk, RowSequence keys, boolean previous) {
            ContextImpl contextImpl = (ContextImpl)fieldContext;
            Chunk inputChunk = previous ? this.chunkSource.getPrevChunk(contextImpl.getContext, keys) : this.chunkSource.getChunk(contextImpl.getContext, keys);
            for (int ii = 0; ii < inputChunk.size(); ++ii) {
                ((GenericRecord)avroChunk.get(ii)).put(this.fieldName, this.getFieldElement(ii, inputChunk));
            }
        }

        class ContextImpl
        implements FieldContext {
            ChunkSource.GetContext getContext;

            ContextImpl(int size) {
                this.getContext = GenericRecordFieldProcessorImpl.this.chunkSource.makeGetContext(size);
            }

            public void close() {
                this.getContext.close();
            }
        }
    }

    private static abstract class GenericRecordFieldProcessor {
        final String fieldName;

        public GenericRecordFieldProcessor(String fieldName) {
            this.fieldName = fieldName;
        }

        abstract FieldContext makeContext(int var1);

        abstract void processField(FieldContext var1, WritableObjectChunk<GenericRecord, Values> var2, RowSequence var3, boolean var4);
    }

    static interface FieldContext
    extends SafeCloseable {
    }
}

