/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.savepoint;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;

class SavepointV1Serializer
implements SavepointSerializer<SavepointV1> {
    private static final byte NULL_HANDLE = 0;
    private static final byte BYTE_STREAM_STATE_HANDLE = 1;
    private static final byte FILE_STREAM_STATE_HANDLE = 2;
    private static final byte KEY_GROUPS_HANDLE = 3;
    private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
    public static final SavepointV1Serializer INSTANCE = new SavepointV1Serializer();

    private SavepointV1Serializer() {
    }

    @Override
    public void serialize(SavepointV1 savepoint, DataOutputStream dos) throws IOException {
        try {
            dos.writeLong(savepoint.getCheckpointId());
            Collection<TaskState> taskStates = savepoint.getTaskStates();
            dos.writeInt(taskStates.size());
            for (TaskState taskState : savepoint.getTaskStates()) {
                dos.writeLong(taskState.getJobVertexID().getLowerPart());
                dos.writeLong(taskState.getJobVertexID().getUpperPart());
                int parallelism = taskState.getParallelism();
                dos.writeInt(parallelism);
                dos.writeInt(taskState.getMaxParallelism());
                dos.writeInt(taskState.getChainLength());
                Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates();
                dos.writeInt(subtaskStateMap.size());
                for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) {
                    dos.writeInt(entry.getKey());
                    SavepointV1Serializer.serializeSubtaskState(entry.getValue(), dos);
                }
            }
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    @Override
    public SavepointV1 deserialize(DataInputStream dis, ClassLoader cl) throws IOException {
        long checkpointId = dis.readLong();
        int numTaskStates = dis.readInt();
        ArrayList<TaskState> taskStates = new ArrayList<TaskState>(numTaskStates);
        for (int i = 0; i < numTaskStates; ++i) {
            JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
            int parallelism = dis.readInt();
            int maxParallelism = dis.readInt();
            int chainLength = dis.readInt();
            TaskState taskState = new TaskState(jobVertexId, parallelism, maxParallelism, chainLength);
            taskStates.add(taskState);
            int numSubTaskStates = dis.readInt();
            for (int j = 0; j < numSubTaskStates; ++j) {
                int subtaskIndex = dis.readInt();
                SubtaskState subtaskState = SavepointV1Serializer.deserializeSubtaskState(dis);
                taskState.putState(subtaskIndex, subtaskState);
            }
        }
        return new SavepointV1(checkpointId, taskStates);
    }

    private static void serializeSubtaskState(SubtaskState subtaskState, DataOutputStream dos) throws IOException {
        dos.writeLong(-1L);
        ChainedStateHandle<StreamStateHandle> nonPartitionableState = subtaskState.getLegacyOperatorState();
        int len = nonPartitionableState != null ? nonPartitionableState.getLength() : 0;
        dos.writeInt(len);
        for (int i = 0; i < len; ++i) {
            StreamStateHandle stateHandle = nonPartitionableState.get(i);
            SavepointV1Serializer.serializeStreamStateHandle(stateHandle, dos);
        }
        ChainedStateHandle<OperatorStateHandle> operatorStateBackend = subtaskState.getManagedOperatorState();
        len = operatorStateBackend != null ? operatorStateBackend.getLength() : 0;
        dos.writeInt(len);
        for (int i = 0; i < len; ++i) {
            OperatorStateHandle stateHandle = operatorStateBackend.get(i);
            SavepointV1Serializer.serializeOperatorStateHandle(stateHandle, dos);
        }
        ChainedStateHandle<OperatorStateHandle> operatorStateFromStream = subtaskState.getRawOperatorState();
        len = operatorStateFromStream != null ? operatorStateFromStream.getLength() : 0;
        dos.writeInt(len);
        for (int i = 0; i < len; ++i) {
            OperatorStateHandle stateHandle = operatorStateFromStream.get(i);
            SavepointV1Serializer.serializeOperatorStateHandle(stateHandle, dos);
        }
        KeyGroupsStateHandle keyedStateBackend = subtaskState.getManagedKeyedState();
        SavepointV1Serializer.serializeKeyGroupStateHandle(keyedStateBackend, dos);
        KeyGroupsStateHandle keyedStateStream = subtaskState.getRawKeyedState();
        SavepointV1Serializer.serializeKeyGroupStateHandle(keyedStateStream, dos);
    }

    private static SubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
        long ignoredDuration = dis.readLong();
        int len = dis.readInt();
        ArrayList<StreamStateHandle> nonPartitionableState = new ArrayList<StreamStateHandle>(len);
        for (int i = 0; i < len; ++i) {
            StreamStateHandle streamStateHandle = SavepointV1Serializer.deserializeStreamStateHandle(dis);
            nonPartitionableState.add(streamStateHandle);
        }
        len = dis.readInt();
        ArrayList<OperatorStateHandle> operatorStateBackend = new ArrayList<OperatorStateHandle>(len);
        for (int i = 0; i < len; ++i) {
            OperatorStateHandle streamStateHandle = SavepointV1Serializer.deserializeOperatorStateHandle(dis);
            operatorStateBackend.add(streamStateHandle);
        }
        len = dis.readInt();
        ArrayList<OperatorStateHandle> operatorStateStream = new ArrayList<OperatorStateHandle>(len);
        for (int i = 0; i < len; ++i) {
            OperatorStateHandle streamStateHandle = SavepointV1Serializer.deserializeOperatorStateHandle(dis);
            operatorStateStream.add(streamStateHandle);
        }
        KeyGroupsStateHandle keyedStateBackend = SavepointV1Serializer.deserializeKeyGroupStateHandle(dis);
        KeyGroupsStateHandle keyedStateStream = SavepointV1Serializer.deserializeKeyGroupStateHandle(dis);
        ChainedStateHandle<StreamStateHandle> nonPartitionableStateChain = new ChainedStateHandle<StreamStateHandle>(nonPartitionableState);
        ChainedStateHandle<OperatorStateHandle> operatorStateBackendChain = new ChainedStateHandle<OperatorStateHandle>(operatorStateBackend);
        ChainedStateHandle<OperatorStateHandle> operatorStateStreamChain = new ChainedStateHandle<OperatorStateHandle>(operatorStateStream);
        return new SubtaskState(nonPartitionableStateChain, operatorStateBackendChain, operatorStateStreamChain, keyedStateBackend, keyedStateStream);
    }

    private static void serializeKeyGroupStateHandle(KeyGroupsStateHandle stateHandle, DataOutputStream dos) throws IOException {
        if (stateHandle != null) {
            dos.writeByte(3);
            dos.writeInt(stateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup());
            dos.writeInt(stateHandle.getNumberOfKeyGroups());
            for (int keyGroup : stateHandle.keyGroups()) {
                dos.writeLong(stateHandle.getOffsetForKeyGroup(keyGroup));
            }
            SavepointV1Serializer.serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
        } else {
            dos.writeByte(0);
        }
    }

    private static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
        byte type = dis.readByte();
        if (0 == type) {
            return null;
        }
        if (3 == type) {
            int startKeyGroup = dis.readInt();
            int numKeyGroups = dis.readInt();
            KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
            long[] offsets = new long[numKeyGroups];
            for (int i = 0; i < numKeyGroups; ++i) {
                offsets[i] = dis.readLong();
            }
            KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, offsets);
            StreamStateHandle stateHandle = SavepointV1Serializer.deserializeStreamStateHandle(dis);
            return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
        }
        throw new IllegalStateException("Reading invalid KeyGroupsStateHandle, type: " + type);
    }

    private static void serializeOperatorStateHandle(OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
        if (stateHandle != null) {
            dos.writeByte(4);
            Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsetsMap = stateHandle.getStateNameToPartitionOffsets();
            dos.writeInt(partitionOffsetsMap.size());
            for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> entry : partitionOffsetsMap.entrySet()) {
                dos.writeUTF(entry.getKey());
                OperatorStateHandle.StateMetaInfo stateMetaInfo = entry.getValue();
                int mode = stateMetaInfo.getDistributionMode().ordinal();
                dos.writeByte(mode);
                long[] offsets = stateMetaInfo.getOffsets();
                dos.writeInt(offsets.length);
                for (long offset : offsets) {
                    dos.writeLong(offset);
                }
            }
            SavepointV1Serializer.serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
        } else {
            dos.writeByte(0);
        }
    }

    private static OperatorStateHandle deserializeOperatorStateHandle(DataInputStream dis) throws IOException {
        byte type = dis.readByte();
        if (0 == type) {
            return null;
        }
        if (4 == type) {
            int mapSize = dis.readInt();
            HashMap<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<String, OperatorStateHandle.StateMetaInfo>(mapSize);
            for (int i = 0; i < mapSize; ++i) {
                String key = dis.readUTF();
                byte modeOrdinal = dis.readByte();
                OperatorStateHandle.Mode mode = OperatorStateHandle.Mode.values()[modeOrdinal];
                long[] offsets = new long[dis.readInt()];
                for (int j = 0; j < offsets.length; ++j) {
                    offsets[j] = dis.readLong();
                }
                OperatorStateHandle.StateMetaInfo metaInfo = new OperatorStateHandle.StateMetaInfo(offsets, mode);
                offsetsMap.put(key, metaInfo);
            }
            StreamStateHandle stateHandle = SavepointV1Serializer.deserializeStreamStateHandle(dis);
            return new OperatorStateHandle(offsetsMap, stateHandle);
        }
        throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type);
    }

    private static void serializeStreamStateHandle(StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
        if (stateHandle == null) {
            dos.writeByte(0);
        } else if (stateHandle instanceof FileStateHandle) {
            dos.writeByte(2);
            FileStateHandle fileStateHandle = (FileStateHandle)stateHandle;
            dos.writeLong(stateHandle.getStateSize());
            dos.writeUTF(fileStateHandle.getFilePath().toString());
        } else if (stateHandle instanceof ByteStreamStateHandle) {
            dos.writeByte(1);
            ByteStreamStateHandle byteStreamStateHandle = (ByteStreamStateHandle)stateHandle;
            dos.writeUTF(byteStreamStateHandle.getHandleName());
            byte[] internalData = byteStreamStateHandle.getData();
            dos.writeInt(internalData.length);
            dos.write(byteStreamStateHandle.getData());
        } else {
            throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass());
        }
        dos.flush();
    }

    private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
        int type = dis.read();
        if (0 == type) {
            return null;
        }
        if (2 == type) {
            long size = dis.readLong();
            String pathString = dis.readUTF();
            return new FileStateHandle(new Path(pathString), size);
        }
        if (1 == type) {
            String handleName = dis.readUTF();
            int numBytes = dis.readInt();
            byte[] data = new byte[numBytes];
            dis.readFully(data);
            return new ByteStreamStateHandle(handleName, data);
        }
        throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
    }
}

