/*
 * Decompiled with CFR 0.152.
 */
package org.apache.rocketmq.broker.processor;

import io.netty.channel.ChannelHandlerContext;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBatch;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.stats.BrokerStatsManager;

public class SendMessageProcessor
extends AbstractSendMessageProcessor
implements NettyRequestProcessor {
    private List<ConsumeMessageHook> consumeMessageHookList;

    public SendMessageProcessor(BrokerController brokerController) {
        super(brokerController);
    }

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        switch (request.getCode()) {
            case 36: {
                return this.consumerSendMsgBack(ctx, request);
            }
        }
        SendMessageRequestHeader requestHeader = this.parseRequestHeader(request);
        if (requestHeader == null) {
            return null;
        }
        SendMessageContext mqtraceContext = this.buildMsgContext(ctx, requestHeader);
        this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
        RemotingCommand response = requestHeader.isBatch() ? this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader) : this.sendMessage(ctx, request, mqtraceContext, requestHeader);
        this.executeSendMessageHookAfter(response, mqtraceContext);
        return response;
    }

    @Override
    public boolean rejectRequest() {
        return this.brokerController.getMessageStore().isOSPageCacheBusy() || this.brokerController.getMessageStore().isTransientStorePoolDeficient();
    }

    private RemotingCommand consumerSendMsgBack(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        TopicConfig topicConfig;
        SubscriptionGroupConfig subscriptionGroupConfig;
        RemotingCommand response = RemotingCommand.createResponseCommand(null);
        ConsumerSendMsgBackRequestHeader requestHeader = (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
        if (this.hasConsumeMessageHook() && !UtilAll.isBlank((String)requestHeader.getOriginMsgId())) {
            ConsumeMessageContext context = new ConsumeMessageContext();
            context.setConsumerGroup(requestHeader.getGroup());
            context.setTopic(requestHeader.getOriginTopic());
            context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
            context.setCommercialRcvTimes(1);
            context.setCommercialOwner((String)request.getExtFields().get("Owner"));
            this.executeConsumeMessageHookAfter(context);
        }
        if (null == (subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()))) {
            response.setCode(26);
            response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " " + FAQUrl.suggestTodo((String)"http://rocketmq.apache.org/docs/faq/"));
            return response;
        }
        if (!PermName.isWriteable((int)this.brokerController.getBrokerConfig().getBrokerPermission())) {
            response.setCode(16);
            response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
            return response;
        }
        if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
            response.setCode(0);
            response.setRemark(null);
            return response;
        }
        String newTopic = MixAll.getRetryTopic((String)requestHeader.getGroup());
        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
        int topicSysFlag = 0;
        if (requestHeader.isUnitMode()) {
            topicSysFlag = TopicSysFlag.buildSysFlag((boolean)false, (boolean)true);
        }
        if (null == (topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, subscriptionGroupConfig.getRetryQueueNums(), 6, topicSysFlag))) {
            response.setCode(1);
            response.setRemark("topic[" + newTopic + "] not exist");
            return response;
        }
        if (!PermName.isWriteable((int)topicConfig.getPerm())) {
            response.setCode(16);
            response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
            return response;
        }
        MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset().longValue());
        if (null == msgExt) {
            response.setCode(1);
            response.setRemark("look message by offset failed, " + requestHeader.getOffset());
            return response;
        }
        String retryTopic = msgExt.getProperty("RETRY_TOPIC");
        if (null == retryTopic) {
            MessageAccessor.putProperty((Message)msgExt, (String)"RETRY_TOPIC", (String)msgExt.getTopic());
        }
        msgExt.setWaitStoreMsgOK(false);
        int delayLevel = requestHeader.getDelayLevel();
        int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
        if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
            maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
        }
        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) {
            newTopic = MixAll.getDLQTopic((String)requestHeader.getGroup());
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % 1;
            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, 1, 2, 0);
            if (null == topicConfig) {
                response.setCode(1);
                response.setRemark("topic[" + newTopic + "] not exist");
                return response;
            }
        } else {
            if (0 == delayLevel) {
                delayLevel = 3 + msgExt.getReconsumeTimes();
            }
            msgExt.setDelayTimeLevel(delayLevel);
        }
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(newTopic);
        msgInner.setBody(msgExt.getBody());
        msgInner.setFlag(msgExt.getFlag());
        MessageAccessor.setProperties((Message)msgInner, (Map)msgExt.getProperties());
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String((Map)msgExt.getProperties()));
        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, (String)msgExt.getTags()));
        msgInner.setQueueId(queueIdInt);
        msgInner.setSysFlag(msgExt.getSysFlag());
        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
        msgInner.setBornHost(msgExt.getBornHost());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
        String originMsgId = MessageAccessor.getOriginMessageId((Message)msgExt);
        MessageAccessor.setOriginMessageId((Message)msgInner, (String)(UtilAll.isBlank((String)originMsgId) ? msgExt.getMsgId() : originMsgId));
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        if (putMessageResult != null) {
            switch (putMessageResult.getPutMessageStatus()) {
                case PUT_OK: {
                    String backTopic = msgExt.getTopic();
                    String correctTopic = msgExt.getProperty("RETRY_TOPIC");
                    if (correctTopic != null) {
                        backTopic = correctTopic;
                    }
                    this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
                    response.setCode(0);
                    response.setRemark(null);
                    return response;
                }
            }
            response.setCode(1);
            response.setRemark(putMessageResult.getPutMessageStatus().name());
            return response;
        }
        response.setCode(1);
        response.setRemark("putMessageResult is null");
        return response;
    }

    private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, RemotingCommand request, MessageExt msg, TopicConfig topicConfig) {
        String newTopic = requestHeader.getTopic();
        if (null != newTopic && newTopic.startsWith("%RETRY%")) {
            int reconsumeTimes;
            String groupName = newTopic.substring("%RETRY%".length());
            SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
            if (null == subscriptionGroupConfig) {
                response.setCode(26);
                response.setRemark("subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo((String)"http://rocketmq.apache.org/docs/faq/"));
                return false;
            }
            int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
            if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
                maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
            }
            int n = reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
            if (reconsumeTimes >= maxReconsumeTimes) {
                newTopic = MixAll.getDLQTopic((String)groupName);
                int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % 1;
                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, 1, 2, 0);
                msg.setTopic(newTopic);
                msg.setQueueId(queueIdInt);
                if (null == topicConfig) {
                    response.setCode(1);
                    response.setRemark("topic[" + newTopic + "] not exist");
                    return false;
                }
            }
        }
        int sysFlag = requestHeader.getSysFlag();
        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
            sysFlag |= 2;
        }
        msg.setSysFlag(sysFlag);
        return true;
    }

    private RemotingCommand sendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext sendMessageContext, SendMessageRequestHeader requestHeader) throws RemotingCommandException {
        String traFlag;
        RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
        SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
        response.setOpaque(request.getOpaque());
        response.addExtField("MSG_REGION", this.brokerController.getBrokerConfig().getRegionId());
        response.addExtField("TRACE_ON", String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
        log.debug("receive SendMessage request command, {}", (Object)request);
        long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
        if (this.brokerController.getMessageStore().now() < startTimstamp) {
            response.setCode(1);
            response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2((long)startTimstamp)));
            return response;
        }
        response.setCode(-1);
        super.msgCheck(ctx, requestHeader, response);
        if (response.getCode() != -1) {
            return response;
        }
        byte[] body = request.getBody();
        int queueIdInt = requestHeader.getQueueId();
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
        if (queueIdInt < 0) {
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
        }
        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(requestHeader.getTopic());
        msgInner.setQueueId(queueIdInt);
        if (!this.handleRetryAndDLQ(requestHeader, response, request, (MessageExt)msgInner, topicConfig)) {
            return response;
        }
        msgInner.setBody(body);
        msgInner.setFlag(requestHeader.getFlag().intValue());
        MessageAccessor.setProperties((Message)msgInner, (Map)MessageDecoder.string2messageProperties((String)requestHeader.getProperties()));
        msgInner.setPropertiesString(requestHeader.getProperties());
        msgInner.setBornTimestamp(requestHeader.getBornTimestamp().longValue());
        msgInner.setBornHost(ctx.channel().remoteAddress());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage() && (traFlag = msgInner.getProperty("TRAN_MSG")) != null) {
            response.setCode(16);
            response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
            return response;
        }
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
        return this.handlePutMessageResult(putMessageResult, response, request, (MessageExt)msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
    }

    private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response, RemotingCommand request, MessageExt msg, SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, int queueIdInt) {
        if (putMessageResult == null) {
            response.setCode(1);
            response.setRemark("store putMessage return null");
            return response;
        }
        boolean sendOK = false;
        switch (putMessageResult.getPutMessageStatus()) {
            case PUT_OK: {
                sendOK = true;
                response.setCode(0);
                break;
            }
            case FLUSH_DISK_TIMEOUT: {
                response.setCode(10);
                sendOK = true;
                break;
            }
            case FLUSH_SLAVE_TIMEOUT: {
                response.setCode(12);
                sendOK = true;
                break;
            }
            case SLAVE_NOT_AVAILABLE: {
                response.setCode(11);
                sendOK = true;
                break;
            }
            case CREATE_MAPEDFILE_FAILED: {
                response.setCode(1);
                response.setRemark("create mapped file failed, server is busy or broken.");
                break;
            }
            case MESSAGE_ILLEGAL: 
            case PROPERTIES_SIZE_EXCEEDED: {
                response.setCode(13);
                response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
                break;
            }
            case SERVICE_NOT_AVAILABLE: {
                response.setCode(14);
                response.setRemark("service not available now, maybe disk full, " + this.diskUtil() + ", maybe your broker machine memory too small.");
                break;
            }
            case OS_PAGECACHE_BUSY: {
                response.setCode(1);
                response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
                break;
            }
            case UNKNOWN_ERROR: {
                response.setCode(1);
                response.setRemark("UNKNOWN_ERROR");
                break;
            }
            default: {
                response.setCode(1);
                response.setRemark("UNKNOWN_ERROR DEFAULT");
            }
        }
        String owner = (String)request.getExtFields().get("Owner");
        if (sendOK) {
            this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
            this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
            this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
            response.setRemark(null);
            responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
            responseHeader.setQueueId(Integer.valueOf(queueIdInt));
            responseHeader.setQueueOffset(Long.valueOf(putMessageResult.getAppendMessageResult().getLogicsOffset()));
            this.doResponse(ctx, request, response);
            if (this.hasSendMessageHook()) {
                sendMessageContext.setMsgId(responseHeader.getMsgId());
                sendMessageContext.setQueueId(responseHeader.getQueueId());
                sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
                int commercialBaseCount = this.brokerController.getBrokerConfig().getCommercialBaseCount();
                int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
                int incValue = (int)Math.ceil((double)wroteSize / 65536.0) * commercialBaseCount;
                sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
                sendMessageContext.setCommercialSendTimes(incValue);
                sendMessageContext.setCommercialSendSize(wroteSize);
                sendMessageContext.setCommercialOwner(owner);
            }
            return null;
        }
        if (this.hasSendMessageHook()) {
            int wroteSize = request.getBody().length;
            int incValue = (int)Math.ceil((double)wroteSize / 65536.0);
            sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
            sendMessageContext.setCommercialSendTimes(incValue);
            sendMessageContext.setCommercialSendSize(wroteSize);
            sendMessageContext.setCommercialOwner(owner);
        }
        return response;
    }

    private RemotingCommand sendBatchMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext sendMessageContext, SendMessageRequestHeader requestHeader) throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
        SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
        response.setOpaque(request.getOpaque());
        response.addExtField("MSG_REGION", this.brokerController.getBrokerConfig().getRegionId());
        response.addExtField("TRACE_ON", String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
        log.debug("Receive SendMessage request command {}", (Object)request);
        long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
        if (this.brokerController.getMessageStore().now() < startTimstamp) {
            response.setCode(1);
            response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2((long)startTimstamp)));
            return response;
        }
        response.setCode(-1);
        super.msgCheck(ctx, requestHeader, response);
        if (response.getCode() != -1) {
            return response;
        }
        int queueIdInt = requestHeader.getQueueId();
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
        if (queueIdInt < 0) {
            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
        }
        if (requestHeader.getTopic().length() > 127) {
            response.setCode(13);
            response.setRemark("message topic length too long " + requestHeader.getTopic().length());
            return response;
        }
        if (requestHeader.getTopic() != null && requestHeader.getTopic().startsWith("%RETRY%")) {
            response.setCode(13);
            response.setRemark("batch request does not support retry group " + requestHeader.getTopic());
            return response;
        }
        MessageExtBatch messageExtBatch = new MessageExtBatch();
        messageExtBatch.setTopic(requestHeader.getTopic());
        messageExtBatch.setQueueId(queueIdInt);
        int sysFlag = requestHeader.getSysFlag();
        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
            sysFlag |= 2;
        }
        messageExtBatch.setSysFlag(sysFlag);
        messageExtBatch.setFlag(requestHeader.getFlag().intValue());
        MessageAccessor.setProperties((Message)messageExtBatch, (Map)MessageDecoder.string2messageProperties((String)requestHeader.getProperties()));
        messageExtBatch.setBody(request.getBody());
        messageExtBatch.setBornTimestamp(requestHeader.getBornTimestamp().longValue());
        messageExtBatch.setBornHost(ctx.channel().remoteAddress());
        messageExtBatch.setStoreHost(this.getStoreHost());
        messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);
        return this.handlePutMessageResult(putMessageResult, response, request, (MessageExt)messageExtBatch, responseHeader, sendMessageContext, ctx, queueIdInt);
    }

    public boolean hasConsumeMessageHook() {
        return this.consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
    }

    public void executeConsumeMessageHookAfter(ConsumeMessageContext context) {
        if (this.hasConsumeMessageHook()) {
            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
                try {
                    hook.consumeMessageAfter(context);
                }
                catch (Throwable e) {}
            }
        }
    }

    @Override
    public SocketAddress getStoreHost() {
        return this.storeHost;
    }

    private String diskUtil() {
        String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent((String)storePathPhysic);
        String storePathLogis = StorePathConfigHelper.getStorePathConsumeQueue((String)this.brokerController.getMessageStoreConfig().getStorePathRootDir());
        double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent((String)storePathLogis);
        String storePathIndex = StorePathConfigHelper.getStorePathIndex((String)this.brokerController.getMessageStoreConfig().getStorePathRootDir());
        double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent((String)storePathIndex);
        return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
    }

    public void registerConsumeMessageHook(List<ConsumeMessageHook> consumeMessageHookList) {
        this.consumeMessageHookList = consumeMessageHookList;
    }
}

