001/* 002 * Copyright 2016 Anyware Services 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.ametys.core.schedule; 017 018import java.time.Duration; 019import java.time.Instant; 020import java.util.Map; 021import java.util.Optional; 022 023import org.apache.avalon.framework.context.Context; 024import org.apache.avalon.framework.context.ContextException; 025import org.apache.avalon.framework.service.ServiceException; 026import org.apache.avalon.framework.service.ServiceManager; 027import org.apache.cocoon.Constants; 028import org.apache.cocoon.environment.ObjectModelHelper; 029import org.apache.cocoon.environment.Request; 030import org.apache.cocoon.util.log.SLF4JLoggerAdapter; 031import org.quartz.DisallowConcurrentExecution; 032import org.quartz.Job; 033import org.quartz.JobDataMap; 034import org.quartz.JobDetail; 035import org.quartz.JobExecutionContext; 036import org.quartz.JobExecutionException; 037import org.quartz.JobKey; 038import org.quartz.PersistJobDataAfterExecution; 039import org.quartz.SchedulerException; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.ametys.core.authentication.AuthenticateAction; 044import org.ametys.core.engine.BackgroundEngineHelper; 045import org.ametys.core.engine.BackgroundEnvironment; 046import org.ametys.core.schedule.Runnable.FireProcess; 047import org.ametys.core.schedule.progression.ContainerProgressionTracker; 048import org.ametys.core.schedule.progression.ProgressionTrackerFactory; 049import org.ametys.core.user.UserIdentity; 050import org.ametys.core.user.UserManager; 051import org.ametys.core.user.population.UserPopulationDAO; 052import org.ametys.plugins.core.impl.schedule.AbstractStaticSchedulable; 053import org.ametys.plugins.core.schedule.Scheduler; 054import org.ametys.plugins.core.user.UserDAO; 055import org.ametys.runtime.servlet.RuntimeServlet; 056import org.ametys.runtime.servlet.RuntimeServlet.RunMode; 057 058/** 059 * Ametys implementation of a {@link Job} which delegates the execution of the task to the right {@link Schedulable} 060 */ 061@PersistJobDataAfterExecution 062@DisallowConcurrentExecution 063public class AmetysJob implements Job 064{ 065 /** The key for the last duration of the {@link #execute(org.quartz.JobExecutionContext)} method which is stored in the {@link JobDataMap} */ 066 public static final String KEY_LAST_DURATION = "duration"; 067 /** The key for the previous fire time of this job which is stored in the {@link JobDataMap} */ 068 public static final String KEY_PREVIOUS_FIRE_TIME = "previousFireTime"; 069 /** The key for the success state of the last execution of the job */ 070 public static final String KEY_SUCCESS = "success"; 071 072 /** The service manager */ 073 protected static ServiceManager _serviceManager; 074 /** The extension point for {@link Schedulable}s */ 075 protected static SchedulableExtensionPoint _schedulableEP; 076 /** The scheduler component */ 077 protected static Scheduler _scheduler; 078 /** The cocoon environment context. */ 079 protected static org.apache.cocoon.environment.Context _environmentContext; 080 /** The user manager component */ 081 protected static UserManager _userManager; 082 083 /** 084 * Initialize the static fields. 085 * @param serviceManager The service manager 086 * @param context The context 087 * @throws ServiceException if an error occurs during the lookup of the {@link SchedulableExtensionPoint} 088 * @throws ContextException if environment context object not found 089 */ 090 public static void initialize(ServiceManager serviceManager, Context context) throws ServiceException, ContextException 091 { 092 _serviceManager = serviceManager; 093 _schedulableEP = (SchedulableExtensionPoint) serviceManager.lookup(SchedulableExtensionPoint.ROLE); 094 _scheduler = (Scheduler) serviceManager.lookup(Scheduler.ROLE); 095 _environmentContext = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT); 096 _userManager = (UserManager) serviceManager.lookup(UserManager.ROLE); 097 } 098 099 /** 100 * Releases and destroys used resources 101 */ 102 public static void dispose() 103 { 104 _serviceManager = null; 105 _schedulableEP = null; 106 _scheduler = null; 107 _environmentContext = null; 108 _userManager = null; 109 } 110 111 @Override 112 public void execute(JobExecutionContext context) throws JobExecutionException 113 { 114 if (RuntimeServlet.getRunMode().equals(RunMode.STOPPING)) 115 { 116 throw new JobExecutionException("Application is currently stopping. Impossible to run a job."); 117 } 118 119 JobDetail detail = context.getJobDetail(); 120 JobDataMap jobDataMap = detail.getJobDataMap(); 121 JobKey jobKey = detail.getKey(); 122 String runnableId = jobDataMap.getString(Scheduler.KEY_RUNNABLE_ID); 123 String schedulableId = jobDataMap.getString(Scheduler.KEY_SCHEDULABLE_ID); 124 Schedulable schedulable = _schedulableEP.getExtension(schedulableId); 125 Logger logger = LoggerFactory.getLogger(AmetysJob.class.getName() + "$" + schedulableId); 126 127 // Concurrency allowed ? 128 if (!schedulable.acceptConcurrentExecution() && _checkConcurrency(schedulableId, runnableId, jobKey, logger)) 129 { 130 logger.warn("The Runnable '{}' of the Schedulable '{}' cannot be executed now because concurrent execution is not allowed for this Schedulable", runnableId, schedulableId); 131 return; 132 } 133 134 boolean success = true; 135 136 // Set the previous (which is actually the current) fire time in the map (because the trigger may no longer exist in the future) 137 jobDataMap.put(KEY_PREVIOUS_FIRE_TIME, context.getTrigger().getPreviousFireTime().getTime()); // possible with @PersistJobDataAfterExecution annotation 138 139 Map<String, Object> environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_serviceManager, _environmentContext, new SLF4JLoggerAdapter(logger)); 140 141 String userIdentityAsString = jobDataMap.getString(Scheduler.KEY_RUNNABLE_USERIDENTITY); 142 if (!_setUserInSession(environmentInformation, userIdentityAsString)) 143 { 144 success = false; 145 String message = String.format("The UserIdentity '%s' defined for the execution of the job '%s' is not valid", userIdentityAsString, schedulableId); 146 logger.error(message); 147 throw new JobExecutionException(message); 148 } 149 150 logger.info("Executing the Runnable '{}' of the Schedulable '{}' with jobDataMap:\n '{}'", runnableId, schedulableId, jobDataMap.getWrappedMap().toString()); 151 Instant start = Instant.now(); 152 try 153 { 154 ContainerProgressionTracker progressionTracker = ProgressionTrackerFactory.createContainerProgressionTracker(schedulable.getLabel(), logger); 155 context.put(AbstractStaticSchedulable.PROGRESSION_TRACKER, progressionTracker); 156 157 schedulable.execute(context, progressionTracker); 158 } 159 catch (Exception e) 160 { 161 success = false; 162 String msg = String.format("An exception occurred during the execution of the Schedulable '%s'", schedulableId); 163 logger.error(msg, e); 164 throw new JobExecutionException(msg, e); 165 } 166 catch (Throwable t) 167 { 168 success = false; 169 logger.error(String.format("An error occurred during the execution of the Schedulable '%s'", schedulableId), t); 170 throw t; 171 } 172 finally 173 { 174 // Set the duration in the map 175 Instant end = Instant.now(); 176 long duration = Duration.between(start, end).toMillis(); 177 jobDataMap.put(KEY_LAST_DURATION, duration); // possible with @PersistJobDataAfterExecution annotation 178 179 BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation); 180 181 // Success ? 182 jobDataMap.put(KEY_SUCCESS, success); 183 184 // Run at startup tasks are one-shot tasks => if so, indicates it is completed for never refiring it 185 if (FireProcess.STARTUP.toString().equals(jobDataMap.getString(Scheduler.KEY_RUNNABLE_FIRE_PROCESS))) 186 { 187 jobDataMap.put(Scheduler.KEY_RUNNABLE_STARTUP_COMPLETED, true); 188 } 189 } 190 } 191 192 private boolean _setUserInSession(Map<String, Object> environmentInformation, String userIdentityAsString) 193 { 194 // For legacy purpose, we set the empty user to the system user 195 UserIdentity userIdentity = Optional.ofNullable(userIdentityAsString) 196 .map(UserIdentity::stringToUserIdentity) 197 .orElse(UserPopulationDAO.SYSTEM_USER_IDENTITY); 198 199 // Invalid user identity 200 if (_userManager.getUser(userIdentity) == null) 201 { 202 return false; 203 } 204 205 BackgroundEnvironment bgEnv = (BackgroundEnvironment) environmentInformation.get("environment"); 206 Request request = ObjectModelHelper.getRequest(bgEnv.getObjectModel()); 207 208 AuthenticateAction.setUserIdentityInSession(request, userIdentity, new UserDAO.ImpersonateCredentialProvider(), true); 209 210 return true; 211 } 212 213 private boolean _checkConcurrency(String schedulableId, String runnableId, JobKey currentJobKey, Logger logger) throws JobExecutionException 214 { 215 try 216 { 217 return _scheduler.getScheduler().getCurrentlyExecutingJobs().stream() 218 .map(JobExecutionContext::getJobDetail) 219 .filter(detail -> !detail.getKey().equals(currentJobKey)) // currently executing without this 220 .map(detail -> detail.getJobDataMap().getString(Scheduler.KEY_SCHEDULABLE_ID)) 221 .filter(id -> id.equals(schedulableId)) 222 .findFirst() 223 .isPresent(); 224 } 225 catch (SchedulerException e) 226 { 227 logger.error(String.format("An error occured during the concurrency check of the Runnable '%s'", runnableId), e); 228 throw new JobExecutionException(String.format("An error occured during the concurrency check of the Runnable '%s'", runnableId), e); 229 } 230 } 231}