/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.core;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.springframework.core.convert.ConversionService;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.Limit;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStreamCommands;
import org.springframework.data.redis.connection.stream.ByteRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.AbstractOperations;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StreamObjectMapper;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.hash.HashMapper;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.support.collections.CollectionUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

class DefaultStreamOperations<K, HK, HV>
extends AbstractOperations<K, Object>
implements StreamOperations<K, HK, HV> {
    private final StreamObjectMapper objectMapper;

    DefaultStreamOperations(final RedisTemplate<K, ?> template, @Nullable HashMapper<? super K, ? super HK, ? super HV> mapper) {
        super(template);
        this.objectMapper = new StreamObjectMapper(mapper){

            @Override
            protected HashMapper<?, ?, ?> doGetHashMapper(final ConversionService conversionService, final Class<?> targetType) {
                if (this.isSimpleType(targetType)) {
                    return new HashMapper<Object, Object, Object>(){

                        @Override
                        public Map<Object, Object> toHash(Object object) {
                            Object key = "payload";
                            Object value = object;
                            if (!template.isEnableDefaultSerializer()) {
                                if (template.getHashKeySerializer() == null) {
                                    key = key.toString().getBytes(StandardCharsets.UTF_8);
                                }
                                if (template.getHashValueSerializer() == null) {
                                    value = DefaultStreamOperations.this.serializeHashValueIfRequires(object);
                                }
                            }
                            return Collections.singletonMap(key, value);
                        }

                        @Override
                        public Object fromHash(Map<Object, Object> hash) {
                            Object value = hash.values().iterator().next();
                            if (ClassUtils.isAssignableValue((Class)targetType, (Object)value)) {
                                return value;
                            }
                            Object deserialized = DefaultStreamOperations.this.deserializeHashValue((byte[])value);
                            if (ClassUtils.isAssignableValue((Class)targetType, deserialized)) {
                                return value;
                            }
                            return conversionService.convert(deserialized, targetType);
                        }
                    };
                }
                return super.doGetHashMapper(conversionService, targetType);
            }
        };
    }

    @Override
    public Long acknowledge(K key, String group, String ... recordIds) {
        byte[] rawKey = this.rawKey(key);
        return this.execute(connection -> connection.xAck(rawKey, group, recordIds));
    }

    @Override
    @Nullable
    public RecordId add(Record<K, ?> record) {
        Assert.notNull(record, (String)"Record must not be null");
        MapRecord input = StreamObjectMapper.toMapRecord(this, record);
        ByteRecord binaryRecord = input.serialize(this.keySerializer(), this.hashKeySerializer(), this.hashValueSerializer());
        return this.execute(connection -> connection.xAdd(binaryRecord));
    }

    @Override
    public List<MapRecord<K, HK, HV>> claim(final K key, final String consumerGroup, final String newOwner, final RedisStreamCommands.XClaimOptions xClaimOptions) {
        return CollectionUtils.nullSafeList((List)this.execute(new RecordDeserializingRedisCallback(){

            @Override
            @Nullable
            List<ByteRecord> inRedis(RedisConnection connection) {
                return connection.streamCommands().xClaim(DefaultStreamOperations.this.rawKey(key), consumerGroup, newOwner, xClaimOptions);
            }
        }));
    }

    @Override
    public Long delete(K key, RecordId ... recordIds) {
        byte[] rawKey = this.rawKey(key);
        return this.execute(connection -> connection.xDel(rawKey, recordIds));
    }

    @Override
    public String createGroup(K key, ReadOffset readOffset, String group) {
        byte[] rawKey = this.rawKey(key);
        return this.execute(connection -> connection.xGroupCreate(rawKey, group, readOffset, true));
    }

    @Override
    public Boolean deleteConsumer(K key, Consumer consumer) {
        byte[] rawKey = this.rawKey(key);
        return this.execute(connection -> connection.xGroupDelConsumer(rawKey, consumer));
    }

    @Override
    public Boolean destroyGroup(K key, String group) {
        byte[] rawKey = this.rawKey(key);
        return this.execute(connection -> connection.xGroupDestroy(rawKey, group));
    }

    @Override
    public StreamInfo.XInfoStream info(K key) {
        byte[] rawKey = this.rawKey(key);
        return this.execute(connection -> connection.xInfo(rawKey));
    }

    @Override
    public StreamInfo.XInfoConsumers consumers(K key, String group) {
        byte[] rawKey = this.rawKey(key);
        return this.execute(connection -> connection.xInfoConsumers(rawKey, group));
    }

    @Override
    public StreamInfo.XInfoGroups groups(K key) {
        byte[] rawKey = this.rawKey(key);
        return this.execute(connection -> connection.xInfoGroups(rawKey));
    }

    @Override
    public PendingMessages pending(K key, String group, Range<?> range, long count) {
        byte[] rawKey = this.rawKey(key);
        return this.execute(connection -> connection.xPending(rawKey, group, range, (Long)count));
    }

    @Override
    public PendingMessages pending(K key, Consumer consumer, Range<?> range, long count) {
        byte[] rawKey = this.rawKey(key);
        return this.execute(connection -> connection.xPending(rawKey, consumer, range, (Long)count));
    }

    @Override
    public PendingMessagesSummary pending(K key, String group) {
        byte[] rawKey = this.rawKey(key);
        return this.execute(connection -> connection.xPending(rawKey, group));
    }

    @Override
    public Long size(K key) {
        byte[] rawKey = this.rawKey(key);
        return this.execute(connection -> connection.xLen(rawKey));
    }

    @Override
    public List<MapRecord<K, HK, HV>> range(final K key, final Range<String> range, final Limit limit) {
        return (List)this.execute(new RecordDeserializingRedisCallback(){

            @Override
            @Nullable
            List<ByteRecord> inRedis(RedisConnection connection) {
                return connection.xRange(DefaultStreamOperations.this.rawKey(key), (Range<String>)range, limit);
            }
        });
    }

    @Override
    public List<MapRecord<K, HK, HV>> read(final StreamReadOptions readOptions, final StreamOffset<K> ... streams) {
        return (List)this.execute(new RecordDeserializingRedisCallback(){

            @Override
            @Nullable
            List<ByteRecord> inRedis(RedisConnection connection) {
                return connection.xRead(readOptions, DefaultStreamOperations.this.rawStreamOffsets(streams));
            }
        });
    }

    @Override
    public List<MapRecord<K, HK, HV>> read(final Consumer consumer, final StreamReadOptions readOptions, final StreamOffset<K> ... streams) {
        return (List)this.execute(new RecordDeserializingRedisCallback(){

            @Override
            @Nullable
            List<ByteRecord> inRedis(RedisConnection connection) {
                return connection.xReadGroup(consumer, readOptions, DefaultStreamOperations.this.rawStreamOffsets(streams));
            }
        });
    }

    @Override
    public List<MapRecord<K, HK, HV>> reverseRange(final K key, final Range<String> range, final Limit limit) {
        return (List)this.execute(new RecordDeserializingRedisCallback(){

            @Override
            @Nullable
            List<ByteRecord> inRedis(RedisConnection connection) {
                return connection.xRevRange(DefaultStreamOperations.this.rawKey(key), (Range<String>)range, limit);
            }
        });
    }

    @Override
    public Long trim(K key, long count) {
        byte[] rawKey = this.rawKey(key);
        return this.execute(connection -> connection.xTrim(rawKey, count));
    }

    @Override
    public Long trim(K key, long count, boolean approximateTrimming) {
        byte[] rawKey = this.rawKey(key);
        return this.execute(connection -> connection.xTrim(rawKey, count, approximateTrimming));
    }

    @Override
    public <V> HashMapper<V, HK, HV> getHashMapper(Class<V> targetType) {
        return this.objectMapper.getHashMapper(targetType);
    }

    @Override
    public MapRecord<K, HK, HV> deserializeRecord(ByteRecord record) {
        return record.deserialize(this.keySerializer(), this.hashKeySerializer(), this.hashValueSerializer());
    }

    protected byte[] serializeHashValueIfRequires(HV value) {
        return this.hashValueSerializerPresent() ? this.serialize(value, this.hashValueSerializer()) : (byte[])this.objectMapper.getConversionService().convert(value, byte[].class);
    }

    protected boolean hashValueSerializerPresent() {
        return this.hashValueSerializer() != null;
    }

    private byte[] serialize(Object value, RedisSerializer serializer) {
        Object _value = value;
        if (!serializer.canSerialize(value.getClass())) {
            _value = this.objectMapper.getConversionService().convert(value, serializer.getTargetType());
        }
        return serializer.serialize(_value);
    }

    private StreamOffset<byte[]>[] rawStreamOffsets(StreamOffset<K>[] streams) {
        return (StreamOffset[])Arrays.stream(streams).map(it -> StreamOffset.create(this.rawKey(it.getKey()), it.getOffset())).toArray(StreamOffset[]::new);
    }

    abstract class RecordDeserializingRedisCallback
    implements RedisCallback<List<MapRecord<K, HK, HV>>> {
        RecordDeserializingRedisCallback() {
        }

        @Override
        public final List<MapRecord<K, HK, HV>> doInRedis(RedisConnection connection) {
            List<ByteRecord> raw = this.inRedis(connection);
            if (raw == null) {
                return Collections.emptyList();
            }
            ArrayList result = new ArrayList();
            for (ByteRecord record : raw) {
                result.add(DefaultStreamOperations.this.deserializeRecord(record));
            }
            return result;
        }

        @Nullable
        abstract List<ByteRecord> inRedis(RedisConnection var1);
    }
}

