001package io.jboot.support.seata.tcc; 002 003/** 004 * @author zhangxn 005 * @date 2022/5/30 21:32 006 */ 007 008import com.jfinal.plugin.activerecord.Db; 009import com.jfinal.plugin.activerecord.DbPro; 010import com.jfinal.plugin.activerecord.IAtom; 011import io.jboot.db.datasource.DataSourceBuilder; 012import io.jboot.db.datasource.DataSourceConfig; 013import io.jboot.db.datasource.DataSourceConfigManager; 014import io.jboot.utils.StrUtil; 015import io.seata.common.exception.FrameworkErrorCode; 016import io.seata.common.thread.NamedThreadFactory; 017import io.seata.rm.tcc.TwoPhaseResult; 018import io.seata.rm.tcc.constant.TCCFenceConstant; 019import io.seata.rm.tcc.exception.TCCFenceException; 020import io.seata.rm.tcc.store.TCCFenceDO; 021import io.seata.rm.tcc.store.TCCFenceStore; 022import io.seata.rm.tcc.store.db.TCCFenceStoreDataBaseDAO; 023import org.slf4j.Logger; 024import org.slf4j.LoggerFactory; 025 026import javax.sql.DataSource; 027import java.lang.reflect.Method; 028import java.sql.Connection; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.LinkedBlockingQueue; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033 034/** 035 * TCC Fence Handler(idempotent, non_rollback, suspend) 036 * 037 * @author kaka2code 038 */ 039public class TCCFenceHandler { 040 041 private TCCFenceHandler() { 042 throw new IllegalStateException("Utility class"); 043 } 044 045 private static final Logger LOGGER = LoggerFactory.getLogger(io.seata.rm.tcc.TCCFenceHandler.class); 046 047 private static final TCCFenceStore TCC_FENCE_DAO = TCCFenceStoreDataBaseDAO.getInstance(); 048 049 private static DataSource dataSource; 050 051 052 private static final int MAX_THREAD_CLEAN = 1; 053 054 private static final int MAX_QUEUE_SIZE = 500; 055 056 private static final LinkedBlockingQueue<FenceLogIdentity> LOG_QUEUE = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE); 057 058 private static FenceLogCleanRunnable fenceLogCleanRunnable; 059 060 private static ExecutorService logCleanExecutor; 061 062 static { 063 try { 064 initLogCleanExecutor(); 065 } catch (Exception e) { 066 LOGGER.error("init fence log clean executor error", e); 067 } 068 } 069 070 /** 071 * tcc prepare method enhanced 072 * 073 * @param xid the global transaction id 074 * @param branchId the branch transaction id 075 * @param actionName the action name 076 * @return the boolean 077 */ 078 public static Object prepareFence(String xid, Long branchId, String actionName) { 079 DataSourceConfig dataSourceConfig = DataSourceConfigManager.me().getMainDatasourceConfig(); 080 DataSource dataSource = new DataSourceBuilder(dataSourceConfig).build(); 081 IAtom runnable = () -> { 082 Connection connection = dataSource.getConnection(); 083 boolean result = insertTCCFenceLog(connection, xid, branchId, actionName, TCCFenceConstant.STATUS_TRIED); 084 LOGGER.info("TCC fence prepare result: {}. xid: {}, branchId: {}", result, xid, branchId); 085 if (!result) { 086 throw new TCCFenceException(String.format("Insert tcc fence record error, prepare fence failed. xid= %s, branchId= %s", xid, branchId), 087 FrameworkErrorCode.InsertRecordError); 088 } 089 return result; 090 }; 091 DbPro dbPro = StrUtil.isBlank(dataSourceConfig.getName()) ? Db.use() : Db.use(dataSourceConfig.getName()); 092 return dbPro.tx(runnable); 093 } 094 095 /** 096 * tcc commit method enhanced 097 * 098 * @param commitMethod commit method 099 * @param targetTCCBean target tcc bean 100 * @param xid the global transaction id 101 * @param branchId the branch transaction id 102 * @param args commit method's parameters 103 * @return the boolean 104 */ 105 public static boolean commitFence(Method commitMethod, Object targetTCCBean, 106 String xid, Long branchId, Object[] args) { 107 108 DataSourceConfig dataSourceConfig = DataSourceConfigManager.me().getMainDatasourceConfig(); 109 DataSource dataSource = new DataSourceBuilder(dataSourceConfig).build(); 110 IAtom runnable = () -> { 111 Connection connection = dataSource.getConnection(); 112 TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(connection, xid, branchId); 113 if (tccFenceDO == null){ 114 throw new TCCFenceException(String.format("Insert tcc fence record error, rollback fence method failed. xid= %s, branchId= %s", xid, branchId), 115 FrameworkErrorCode.InsertRecordError); 116 } 117 if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) { 118 LOGGER.info("Branch transaction has already committed before. idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus()); 119 return true; 120 } 121 if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) { 122 if (LOGGER.isWarnEnabled()) { 123 LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus()); 124 } 125 return false; 126 } 127 try { 128 return updateStatusAndInvokeTargetMethod(connection, commitMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_COMMITTED, args); 129 } catch (Exception ex) { 130 throw new TCCFenceException(ex.getCause()); 131 } 132 }; 133 DbPro dbPro = StrUtil.isBlank(dataSourceConfig.getName()) ? Db.use() : Db.use(dataSourceConfig.getName()); 134 return dbPro.tx(runnable); 135 } 136 137 /** 138 * tcc rollback method enhanced 139 * 140 * @param rollbackMethod rollback method 141 * @param targetTCCBean target tcc bean 142 * @param xid the global transaction id 143 * @param branchId the branch transaction id 144 * @param args rollback method's parameters 145 * @param actionName the action name 146 * @return the boolean 147 */ 148 public static boolean rollbackFence(Method rollbackMethod, Object targetTCCBean, 149 String xid, Long branchId, Object[] args, String actionName) { 150 DataSourceConfig dataSourceConfig = DataSourceConfigManager.me().getMainDatasourceConfig(); 151 DataSource dataSource = new DataSourceBuilder(dataSourceConfig).build(); 152 IAtom runnable = () -> { 153 try { 154 Connection connection = dataSource.getConnection(); 155 TCCFenceDO tccFenceDO = TCC_FENCE_DAO.queryTCCFenceDO(connection, xid, branchId); 156 if (tccFenceDO == null){ 157 boolean result = insertTCCFenceLog(connection, xid, branchId, actionName, TCCFenceConstant.STATUS_SUSPENDED); 158 LOGGER.info("Insert tcc fence record result: {}. xid: {}, branchId: {}", result, xid, branchId); 159 if (!result) { 160 throw new TCCFenceException(String.format("Insert tcc fence record error, rollback fence method failed. xid= %s, branchId= %s", xid, branchId), 161 FrameworkErrorCode.InsertRecordError); 162 } 163 return true; 164 } else { 165 if (TCCFenceConstant.STATUS_ROLLBACKED == tccFenceDO.getStatus() || TCCFenceConstant.STATUS_SUSPENDED == tccFenceDO.getStatus()) { 166 LOGGER.info("Branch transaction had already rollbacked before, idempotency rejected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus()); 167 return true; 168 } 169 if (TCCFenceConstant.STATUS_COMMITTED == tccFenceDO.getStatus()) { 170 if (LOGGER.isWarnEnabled()) { 171 LOGGER.warn("Branch transaction status is unexpected. xid: {}, branchId: {}, status: {}", xid, branchId, tccFenceDO.getStatus()); 172 } 173 return false; 174 } 175 } 176 return updateStatusAndInvokeTargetMethod(connection, rollbackMethod, targetTCCBean, xid, branchId, TCCFenceConstant.STATUS_ROLLBACKED, args); 177 } catch (Throwable ex) { 178 throw new TCCFenceException(ex.getCause()); 179 } 180 }; 181 DbPro dbPro = StrUtil.isBlank(dataSourceConfig.getName()) ? Db.use() : Db.use(dataSourceConfig.getName()); 182 return dbPro.tx(Connection.TRANSACTION_READ_UNCOMMITTED, runnable); 183 } 184 185 /** 186 * Insert TCC fence log 187 * 188 * @param conn the db connection 189 * @param xid the xid 190 * @param branchId the branchId 191 * @param status the status 192 * @return the boolean 193 */ 194 private static boolean insertTCCFenceLog(Connection conn, String xid, Long branchId, String actionName, Integer status) { 195 TCCFenceDO tccFenceDO = new TCCFenceDO(); 196 tccFenceDO.setXid(xid); 197 tccFenceDO.setBranchId(branchId); 198 tccFenceDO.setActionName(actionName); 199 tccFenceDO.setStatus(status); 200 return TCC_FENCE_DAO.insertTCCFenceDO(conn, tccFenceDO); 201 } 202 203 /** 204 * Update TCC Fence status and invoke target method 205 * 206 * @param method target method 207 * @param targetTCCBean target bean 208 * @param xid the global transaction id 209 * @param branchId the branch transaction id 210 * @param status the tcc fence status 211 * @return the boolean 212 */ 213 private static boolean updateStatusAndInvokeTargetMethod(Connection conn, Method method, Object targetTCCBean, 214 String xid, Long branchId, int status, Object[] args) throws Exception { 215 boolean result = TCC_FENCE_DAO.updateTCCFenceDO(conn, xid, branchId, status, TCCFenceConstant.STATUS_TRIED); 216 if (result) { 217 // invoke two phase method 218 Object ret = method.invoke(targetTCCBean, args); 219 if (null != ret) { 220 if (ret instanceof TwoPhaseResult) { 221 result = ((TwoPhaseResult) ret).isSuccess(); 222 } else { 223 result = (boolean) ret; 224 } 225 } 226 } 227 return result; 228 } 229 230 private static void initLogCleanExecutor() { 231 logCleanExecutor = new ThreadPoolExecutor(MAX_THREAD_CLEAN, MAX_THREAD_CLEAN, Integer.MAX_VALUE, 232 TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), 233 new NamedThreadFactory("fenceLogCleanThread", MAX_THREAD_CLEAN, true) 234 ); 235 fenceLogCleanRunnable = new FenceLogCleanRunnable(); 236 logCleanExecutor.submit(fenceLogCleanRunnable); 237 } 238 239 /** 240 * Delete TCC Fence 241 * 242 * @param xid the global transaction id 243 * @param branchId the branch transaction id 244 * @return the boolean 245 */ 246 public static boolean deleteFence(String xid, Long branchId) { 247 248 DataSourceConfig dataSourceConfig = DataSourceConfigManager.me().getMainDatasourceConfig(); 249 DataSource dataSource = new DataSourceBuilder(dataSourceConfig).build(); 250 IAtom runnable = () -> { 251 try { 252 Connection connection = dataSource.getConnection(); 253 boolean ret = TCC_FENCE_DAO.deleteTCCFenceDO(connection, xid, branchId); 254 return ret; 255 } catch (Throwable ex) { 256 return false; 257 } 258 }; 259 DbPro dbPro = StrUtil.isBlank(dataSourceConfig.getName()) ? Db.use() : Db.use(dataSourceConfig.getName()); 260 return dbPro.tx(Connection.TRANSACTION_READ_UNCOMMITTED, runnable); 261 } 262 263 private static void addToLogCleanQueue(final String xid, final long branchId) { 264 FenceLogIdentity logIdentity = new FenceLogIdentity(); 265 logIdentity.setXid(xid); 266 logIdentity.setBranchId(branchId); 267 try { 268 LOG_QUEUE.add(logIdentity); 269 } catch (Exception e) { 270 LOGGER.warn("Insert tcc fence record into queue for async delete error,xid:{},branchId:{}", xid, branchId, e); 271 } 272 } 273 274 /** 275 * clean fence log that has the final status runnable. 276 * 277 * @see TCCFenceConstant 278 */ 279 private static class FenceLogCleanRunnable implements Runnable { 280 @Override 281 public void run() { 282 while (true) { 283 try { 284 FenceLogIdentity logIdentity = LOG_QUEUE.take(); 285 boolean ret = deleteFence(logIdentity.getXid(), logIdentity.getBranchId()); 286 if (!ret) { 287 LOGGER.error("delete fence log failed, xid: {}, branchId: {}", logIdentity.getXid(), logIdentity.getBranchId()); 288 } 289 } catch (InterruptedException e) { 290 LOGGER.error("take fence log from queue for clean be interrupted", e); 291 } catch (Exception e) { 292 LOGGER.error("exception occur when clean fence log", e); 293 } 294 } 295 } 296 } 297 298 private static class FenceLogIdentity { 299 /** 300 * the global transaction id 301 */ 302 private String xid; 303 304 /** 305 * the branch transaction id 306 */ 307 private Long branchId; 308 309 public String getXid() { 310 return xid; 311 } 312 313 public Long getBranchId() { 314 return branchId; 315 } 316 317 public void setXid(String xid) { 318 this.xid = xid; 319 } 320 321 public void setBranchId(Long branchId) { 322 this.branchId = branchId; 323 } 324 } 325}