package com.els.modules.message.listener;

import com.alibaba.fastjson.JSONObject;
import com.els.common.exception.ELSBootException;
import com.els.modules.message.service.MsgService;
import com.els.rpc.service.InvokeBaseRpcService;
import com.tongtech.client.annotation.HTPMessageListener;
import com.tongtech.client.consumer.PullResult;
import com.tongtech.client.consumer.listener.ConsumeConcurrentlyContext;
import com.tongtech.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.tongtech.client.core.HTPPushConsumerLifecycleListener;
import com.tongtech.client.exception.HTPException;
import com.tongtech.client.htp.consumer.HTPPushConsumer;
import com.tongtech.client.support.HTPListenerConcurrently;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@HTPMessageListener(consumerGroup = "outputbusinessmessage", topic = "outputbusinessmessage", namespace = "zhongken")
/* loaded from: input_file:com/els/modules/message/listener/BusinessMessageConcumerListener.class */
public class BusinessMessageConcumerListener implements HTPListenerConcurrently, HTPPushConsumerLifecycleListener {
    private static final Logger log = LoggerFactory.getLogger(BusinessMessageConcumerListener.class);

    @Resource
    private InvokeBaseRpcService invokeBaseRpcService;

    @Resource
    private MsgService msgService;
    private HTPPushConsumer pushConsumer;

    public ConsumeConcurrentlyStatus onMessage(PullResult pullResult, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        log.info("push消息成功,结果：" + pullResult);
        pullResult.getMsgFoundList().forEach(messageExt -> {
            JSONObject parseObject;
            try {
                String msgId = messageExt.getMsgId();
                try {
                    try {
                        parseObject = JSONObject.parseObject(new String(messageExt.getBody()));
                    } catch (Throwable th) {
                        this.invokeBaseRpcService.updateRecord(msgId, "", "");
                        throw th;
                    }
                } catch (Exception e) {
                    this.invokeBaseRpcService.updateRecord(msgId, "2", e.getMessage());
                }
                if (!parseObject.containsKey("busAccount")) {
                    throw new ELSBootException("busAccount can not be blank");
                }
                if (!parseObject.containsKey("msgKey")) {
                    throw new ELSBootException("msgKey can not be blank");
                }
                if (!parseObject.containsKey("businessType")) {
                    throw new ELSBootException("businessType can not be blank");
                }
                if (!parseObject.containsKey("operateType")) {
                    throw new ELSBootException("operateType can not be blank");
                }
                if (!parseObject.containsKey("receiveList")) {
                    throw new ELSBootException("receiveList can not be blank");
                }
                if (!parseObject.containsKey("sendObj")) {
                    throw new ELSBootException("sendObj can not be blank");
                }
                if (!parseObject.containsKey("businessObj")) {
                    throw new ELSBootException("businessObj can not be blank");
                }
                this.msgService.sendMsg(parseObject);
                this.invokeBaseRpcService.updateRecord(msgId, "1", "");
                this.pushConsumer.acknowledge(messageExt);
            } catch (HTPException e2) {
                throw new RuntimeException((Throwable) e2);
            }
        });
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    public void prepareStart(HTPPushConsumer hTPPushConsumer) {
        this.pushConsumer = hTPPushConsumer;
        this.pushConsumer.setClientAcknowledge();
    }
}
