001package io.jboot.support.seata.tcc;
002
003import com.alibaba.fastjson.JSON;
004import io.seata.common.Constants;
005import io.seata.common.exception.ShouldNeverHappenException;
006import io.seata.common.exception.SkipCallbackWrapperException;
007import io.seata.common.util.StringUtils;
008import io.seata.core.exception.TransactionException;
009import io.seata.core.model.BranchStatus;
010import io.seata.core.model.BranchType;
011import io.seata.core.model.Resource;
012import io.seata.rm.AbstractResourceManager;
013import io.seata.rm.tcc.TCCResource;
014import io.seata.rm.tcc.TwoPhaseResult;
015import io.seata.rm.tcc.api.BusinessActionContext;
016
017import java.lang.reflect.Method;
018import java.lang.reflect.UndeclaredThrowableException;
019import java.util.HashMap;
020import java.util.Map;
021import java.util.concurrent.ConcurrentHashMap;
022
023/**
024 * @author zhangxn
025 */
026public class JbootTCCResourceManager extends AbstractResourceManager {
027
028    /**
029     * TCC resource cache
030     */
031    private Map<String, Resource> tccResourceCache = new ConcurrentHashMap<>();
032
033    /**
034     * Instantiates a new Tcc resource manager.
035     */
036    public JbootTCCResourceManager() {
037        // not do anything
038    }
039
040    /**
041     * registry TCC resource
042     *
043     * @param resource The resource to be managed.
044     */
045    @Override
046    public void registerResource(Resource resource) {
047        TCCResource tccResource = (TCCResource)resource;
048        tccResourceCache.put(tccResource.getResourceId(), tccResource);
049        super.registerResource(tccResource);
050    }
051
052    @Override
053    public Map<String, Resource> getManagedResources() {
054        return tccResourceCache;
055    }
056
057    /**
058     * TCC branch commit
059     *
060     * @param branchType
061     * @param xid             Transaction id.
062     * @param branchId        Branch id.
063     * @param resourceId      Resource id.
064     * @param applicationData Application data bind with this branch.
065     * @return BranchStatus
066     * @throws TransactionException TransactionException
067     */
068    @Override
069    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
070                                     String applicationData) throws TransactionException {
071        TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
072        if (tccResource == null) {
073            throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
074        }
075        Object targetTCCBean = tccResource.getTargetBean();
076        Method commitMethod = tccResource.getCommitMethod();
077        if (targetTCCBean == null || commitMethod == null) {
078            throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
079        }
080        try {
081            //BusinessActionContext
082            BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
083                    applicationData);
084            Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);
085            Object ret;
086            boolean result;
087            // add idempotent and anti hanging
088            if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
089                try {
090                    result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);
091                } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
092                    throw e.getCause();
093                }
094            } else {
095                ret = commitMethod.invoke(targetTCCBean, null);
096                if (ret != null) {
097                    if (ret instanceof TwoPhaseResult) {
098                        result = ((TwoPhaseResult)ret).isSuccess();
099                    } else {
100                        result = (boolean)ret;
101                    }
102                } else {
103                    result = true;
104                }
105            }
106            LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
107            return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
108        } catch (Throwable t) {
109            String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
110            LOGGER.error(msg, t);
111            return BranchStatus.PhaseTwo_CommitFailed_Retryable;
112        }
113    }
114
115    /**
116     * TCC branch rollback
117     *
118     * @param branchType      the branch type
119     * @param xid             Transaction id.
120     * @param branchId        Branch id.
121     * @param resourceId      Resource id.
122     * @param applicationData Application data bind with this branch.
123     * @return BranchStatus
124     * @throws TransactionException TransactionException
125     */
126    @Override
127    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
128                                       String applicationData) throws TransactionException {
129        TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
130        if (tccResource == null) {
131            throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
132        }
133        Object targetTCCBean = tccResource.getTargetBean();
134        Method rollbackMethod = tccResource.getRollbackMethod();
135        if (targetTCCBean == null || rollbackMethod == null) {
136            throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
137        }
138        try {
139            //BusinessActionContext
140            BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
141                    applicationData);
142            Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext);
143
144            Object ret;
145            boolean result;
146            // add idempotent and anti hanging
147            if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
148                try {
149                    result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId,
150                            args, tccResource.getActionName());
151                } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
152                    throw e.getCause();
153                }
154            } else {
155                ret = rollbackMethod.invoke(targetTCCBean,args);
156                if (ret != null) {
157                    if (ret instanceof TwoPhaseResult) {
158                        result = ((TwoPhaseResult)ret).isSuccess();
159                    } else {
160                        result = (boolean)ret;
161                    }
162                } else {
163                    result = true;
164                }
165            }
166            LOGGER.info("TCC resource rollback result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
167            return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;
168        } catch (Throwable t) {
169            String msg = String.format("rollback TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
170            LOGGER.error(msg, t);
171            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
172        }
173    }
174
175    /**
176     * transfer tcc applicationData to BusinessActionContext
177     *
178     * @param xid             the xid
179     * @param branchId        the branch id
180     * @param resourceId      the resource id
181     * @param applicationData the application data
182     * @return business action context
183     */
184    protected BusinessActionContext getBusinessActionContext(String xid, long branchId, String resourceId,
185                                                             String applicationData) {
186        Map actionContextMap = null;
187        if (StringUtils.isNotBlank(applicationData)) {
188            Map tccContext = JSON.parseObject(applicationData, Map.class);
189            actionContextMap = (Map)tccContext.get(Constants.TCC_ACTION_CONTEXT);
190        }
191        if (actionContextMap == null) {
192            actionContextMap = new HashMap<>(2);
193        }
194
195        //instance the action context
196        BusinessActionContext businessActionContext = new BusinessActionContext(
197                xid, String.valueOf(branchId), actionContextMap);
198        businessActionContext.setActionName(resourceId);
199        return businessActionContext;
200    }
201
202    /**
203     * get phase two commit method's args
204     * @param tccResource tccResource
205     * @param businessActionContext businessActionContext
206     * @return args
207     */
208    private Object[] getTwoPhaseCommitArgs(TCCResource tccResource, BusinessActionContext businessActionContext) {
209        String[] keys = tccResource.getPhaseTwoCommitKeys();
210        Class<?>[] argsCommitClasses = tccResource.getCommitArgsClasses();
211        return this.getTwoPhaseMethodParams(keys, argsCommitClasses, businessActionContext);
212    }
213
214    /**
215     * get phase two rollback method's args
216     * @param tccResource tccResource
217     * @param businessActionContext businessActionContext
218     * @return args
219     */
220    private Object[] getTwoPhaseRollbackArgs(TCCResource tccResource, BusinessActionContext businessActionContext) {
221        String[] keys = tccResource.getPhaseTwoRollbackKeys();
222        Class<?>[] argsRollbackClasses = tccResource.getRollbackArgsClasses();
223        return this.getTwoPhaseMethodParams(keys, argsRollbackClasses, businessActionContext);
224    }
225
226    private Object[] getTwoPhaseMethodParams(String[] keys, Class<?>[] argsClasses, BusinessActionContext businessActionContext) {
227        Object[] args = new Object[argsClasses.length];
228        for (int i = 0; i < argsClasses.length; i++) {
229            if (argsClasses[i].equals(BusinessActionContext.class)) {
230                args[i] = businessActionContext;
231            } else {
232                args[i] = businessActionContext.getActionContext(keys[i], argsClasses[i]);
233            }
234        }
235        return args;
236    }
237
238    @Override
239    public BranchType getBranchType() {
240        return BranchType.TCC;
241    }
242}