转载

activiti高并发之id生成器

默认的id生成策略

默认采用的是org.activiti.engine.impl.db.DbIdGenerator

public class DbIdGenerator implements IdGenerator {    protected int idBlockSize;   protected long nextId = 0;   protected long lastId = -1;      protected CommandExecutor commandExecutor;   protected CommandConfig commandConfig;      public synchronized String getNextId() {     if (lastId<nextId) {       getNewBlock();     }     long _nextId = nextId++;     return Long.toString(_nextId);   }    protected synchronized void getNewBlock() {     IdBlock idBlock = commandExecutor.execute(commandConfig, new GetNextIdBlockCmd(idBlockSize));     this.nextId = idBlock.getNextId();     this.lastId = idBlock.getLastId();   }    public int getIdBlockSize() {     return idBlockSize;   }    public void setIdBlockSize(int idBlockSize) {     this.idBlockSize = idBlockSize;   }      public CommandExecutor getCommandExecutor() {     return commandExecutor;   }    public void setCommandExecutor(CommandExecutor commandExecutor) {     this.commandExecutor = commandExecutor;   }      public CommandConfig getCommandConfig() {     return commandConfig;   }      public void setCommandConfig(CommandConfig commandConfig) {     this.commandConfig = commandConfig;   } }

每次从数据库中取出一段

public class GetNextIdBlockCmd implements Command<IdBlock> {      private static final long serialVersionUID = 1L;   protected int idBlockSize;      public GetNextIdBlockCmd(int idBlockSize) {     this.idBlockSize = idBlockSize;   }    public IdBlock execute(CommandContext commandContext) {     PropertyEntity property = (PropertyEntity) commandContext       .getPropertyEntityManager()       .findPropertyById("next.dbid");     long oldValue = Long.parseLong(property.getValue());     long newValue = oldValue+idBlockSize;     property.setValue(Long.toString(newValue));     return new IdBlock(oldValue, newValue-1);   } }

在CommandContextInterceptor里头,拦截了命令

public class CommandContextInterceptor extends AbstractCommandInterceptor {   private static final Logger log = LoggerFactory.getLogger(CommandContextInterceptor.class);    protected CommandContextFactory commandContextFactory;   protected ProcessEngineConfigurationImpl processEngineConfiguration;    public CommandContextInterceptor() {   }    public CommandContextInterceptor(CommandContextFactory commandContextFactory, ProcessEngineConfigurationImpl processEngineConfiguration) {     this.commandContextFactory = commandContextFactory;     this.processEngineConfiguration = processEngineConfiguration;   }    public <T> T execute(CommandConfig config, Command<T> command) {     CommandContext context = Context.getCommandContext();          boolean contextReused = false;     // We need to check the exception, because the transaction can be in a rollback state,     // and some other command is being fired to compensate (eg. decrementing job retries)     if (!config.isContextReusePossible() || context == null || context.getException() != null) {          context = commandContextFactory.createCommandContext(command);             }       else {         log.debug("Valid context found. Reusing it for the current command '{}'", command.getClass().getCanonicalName());         contextReused = true;     }      try {       // Push on stack       Context.setCommandContext(context);       Context.setProcessEngineConfiguration(processEngineConfiguration);              return next.execute(config, command);            } catch (Exception e) {                context.exception(e);            } finally {       try {           if (!contextReused) {               context.close();           }       } finally {           // Pop from stack           Context.removeCommandContext();           Context.removeProcessEngineConfiguration();           Context.removeBpmnOverrideContext();       }     }          return null;   }      public CommandContextFactory getCommandContextFactory() {     return commandContextFactory;   }      public void setCommandContextFactory(CommandContextFactory commandContextFactory) {     this.commandContextFactory = commandContextFactory;   }    public ProcessEngineConfigurationImpl getProcessEngineConfiguration() {     return processEngineConfiguration;   }    public void setProcessEngineContext(ProcessEngineConfigurationImpl processEngineContext) {     this.processEngineConfiguration = processEngineContext;   }

里头的finally里头,有个context.close方法

public void close() {     // the intention of this method is that all resources are closed properly, even     // if exceptions occur in close or flush methods of the sessions or the     // transaction context.      try {       try {         try {                          if (exception == null && closeListeners != null) {                 try {                     for (CommandContextCloseListener listener : closeListeners) {                         listener.closing(this);                     }                 } catch (Throwable exception) {                     exception(exception);                 }             }            if (exception == null) {             flushSessions();           }          } catch (Throwable exception) {           exception(exception);         } finally {                        try {             if (exception == null) {               transactionContext.commit();             }           } catch (Throwable exception) {             exception(exception);           }                        if (exception == null && closeListeners != null) {                 try {                     for (CommandContextCloseListener listener : closeListeners) {                         listener.closed(this);                     }                 } catch (Throwable exception) {                     exception(exception);                 }             }            if (exception != null) {             if (exception instanceof JobNotFoundException || exception instanceof ActivitiTaskAlreadyClaimedException) {               // reduce log level, because this may have been caused because of job deletion due to cancelActiviti="true"               log.info("Error while closing command context", exception);             } else if (exception instanceof ActivitiOptimisticLockingException) {               // reduce log level, as normally we're not interested in logging this exception               log.debug("Optimistic locking exception : " + exception);             } else {               log.debug("Error while closing command context", exception);             }              transactionContext.rollback();           }         }       } catch (Throwable exception) {         exception(exception);       } finally {         closeSessions();        }     } catch (Throwable exception) {       exception(exception);     }       // rethrow the original exception if there was one     if (exception != null) {       if (exception instanceof Error) {         throw (Error) exception;       } else if (exception instanceof RuntimeException) {         throw (RuntimeException) exception;       } else {         throw new ActivitiException("exception while executing command " + command, exception);       }     }   }

该方法会去flushSessions

public void flush() {     List<DeleteOperation> removedOperations = removeUnnecessaryOperations();          flushDeserializedObjects();     List<PersistentObject> updatedObjects = getUpdatedObjects();          if (log.isDebugEnabled()) {       Collection<List<PersistentObject>> insertedObjectLists = insertedObjects.values();       int nrOfInserts = 0, nrOfUpdates = 0, nrOfDeletes = 0;       for (List<PersistentObject> insertedObjectList: insertedObjectLists) {           for (PersistentObject insertedObject : insertedObjectList) {               log.debug("  insert {}", insertedObject);               nrOfInserts++;           }       }       for (PersistentObject updatedObject: updatedObjects) {         log.debug("  update {}", updatedObject);         nrOfUpdates++;       }       for (DeleteOperation deleteOperation: deleteOperations) {         log.debug("  {}", deleteOperation);         nrOfDeletes++;       }       log.debug("flush summary: {} insert, {} update, {} delete.", nrOfInserts, nrOfUpdates, nrOfDeletes);       log.debug("now executing flush...");     }      flushInserts();     flushUpdates(updatedObjects);     flushDeletes(removedOperations);   }

会去flushUpdates。因此如果在高并发的场景下,可能一个线程读取一段block之后,还没有来得及update,已经被另一线程读取,造成id已经被占用的情况。为解决高并发的问题,可以采用uuid策略。

uuid策略

org.activiti.engine.impl.persistence.StrongUuidGenerator

public class StrongUuidGenerator implements IdGenerator {    // different ProcessEngines on the same classloader share one generator.   protected static TimeBasedGenerator timeBasedGenerator;    public StrongUuidGenerator() {     ensureGeneratorInitialized();   }    protected void ensureGeneratorInitialized() {     if (timeBasedGenerator == null) {       synchronized (StrongUuidGenerator.class) {         if (timeBasedGenerator == null) {           timeBasedGenerator = Generators.timeBasedGenerator(EthernetAddress.fromInterface());         }       }     }   }    public String getNextId() {     return timeBasedGenerator.generate().toString();   }  }

采用的是com.fasterxml.uuid.impl.TimeBasedGenerator

参考

  • UUID id generator for high concurrency

原文  https://segmentfault.com/a/1190000005936564
正文到此结束
Loading...