001/* 002 * Copyright 2016 Anyware Services 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.ametys.core.schedule; 017 018import java.time.Duration; 019import java.time.Instant; 020import java.util.Map; 021import java.util.Optional; 022 023import org.apache.avalon.framework.context.Context; 024import org.apache.avalon.framework.context.ContextException; 025import org.apache.avalon.framework.service.ServiceException; 026import org.apache.avalon.framework.service.ServiceManager; 027import org.apache.cocoon.Constants; 028import org.apache.cocoon.environment.ObjectModelHelper; 029import org.apache.cocoon.environment.Request; 030import org.apache.cocoon.util.log.SLF4JLoggerAdapter; 031import org.quartz.DisallowConcurrentExecution; 032import org.quartz.Job; 033import org.quartz.JobDataMap; 034import org.quartz.JobDetail; 035import org.quartz.JobExecutionContext; 036import org.quartz.JobExecutionException; 037import org.quartz.JobKey; 038import org.quartz.PersistJobDataAfterExecution; 039import org.quartz.SchedulerException; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043import org.ametys.core.authentication.AuthenticateAction; 044import org.ametys.core.engine.BackgroundEngineHelper; 045import org.ametys.core.engine.BackgroundEnvironment; 046import org.ametys.core.schedule.Runnable.FireProcess; 047import org.ametys.core.user.UserIdentity; 048import org.ametys.core.user.UserManager; 049import org.ametys.core.user.population.UserPopulationDAO; 050import org.ametys.plugins.core.schedule.Scheduler; 051import org.ametys.plugins.core.user.UserDAO; 052import org.ametys.runtime.servlet.RuntimeServlet; 053import org.ametys.runtime.servlet.RuntimeServlet.RunMode; 054 055/** 056 * Ametys implementation of a {@link Job} which delegates the execution of the task to the right {@link Schedulable} 057 */ 058@PersistJobDataAfterExecution 059@DisallowConcurrentExecution 060public class AmetysJob implements Job 061{ 062 /** The key for the last duration of the {@link #execute(org.quartz.JobExecutionContext)} method which is stored in the {@link JobDataMap} */ 063 public static final String KEY_LAST_DURATION = "duration"; 064 /** The key for the previous fire time of this job which is stored in the {@link JobDataMap} */ 065 public static final String KEY_PREVIOUS_FIRE_TIME = "previousFireTime"; 066 /** The key for the success state of the last execution of the job */ 067 public static final String KEY_SUCCESS = "success"; 068 069 /** The service manager */ 070 protected static ServiceManager _serviceManager; 071 /** The extension point for {@link Schedulable}s */ 072 protected static SchedulableExtensionPoint _schedulableEP; 073 /** The scheduler component */ 074 protected static Scheduler _scheduler; 075 /** The cocoon environment context. */ 076 protected static org.apache.cocoon.environment.Context _environmentContext; 077 /** The user manager component */ 078 protected static UserManager _userManager; 079 080 /** 081 * Initialize the static fields. 082 * @param serviceManager The service manager 083 * @param context The context 084 * @throws ServiceException if an error occurs during the lookup of the {@link SchedulableExtensionPoint} 085 * @throws ContextException if environment context object not found 086 */ 087 public static void initialize(ServiceManager serviceManager, Context context) throws ServiceException, ContextException 088 { 089 _serviceManager = serviceManager; 090 _schedulableEP = (SchedulableExtensionPoint) serviceManager.lookup(SchedulableExtensionPoint.ROLE); 091 _scheduler = (Scheduler) serviceManager.lookup(Scheduler.ROLE); 092 _environmentContext = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT); 093 _userManager = (UserManager) serviceManager.lookup(UserManager.ROLE); 094 } 095 096 /** 097 * Releases and destroys used resources 098 */ 099 public static void dispose() 100 { 101 _serviceManager = null; 102 _schedulableEP = null; 103 _scheduler = null; 104 _environmentContext = null; 105 _userManager = null; 106 } 107 108 @Override 109 public void execute(JobExecutionContext context) throws JobExecutionException 110 { 111 if (RuntimeServlet.getRunMode().equals(RunMode.STOPPING)) 112 { 113 throw new JobExecutionException("Application is currently stopping. Impossible to run a job."); 114 } 115 116 JobDetail detail = context.getJobDetail(); 117 JobDataMap jobDataMap = detail.getJobDataMap(); 118 JobKey jobKey = detail.getKey(); 119 String runnableId = jobDataMap.getString(Scheduler.KEY_RUNNABLE_ID); 120 String schedulableId = jobDataMap.getString(Scheduler.KEY_SCHEDULABLE_ID); 121 Schedulable schedulable = _schedulableEP.getExtension(schedulableId); 122 Logger logger = LoggerFactory.getLogger(AmetysJob.class.getName() + "$" + schedulableId); 123 124 // Concurrency allowed ? 125 if (!schedulable.acceptConcurrentExecution() && _checkConcurrency(schedulableId, runnableId, jobKey, logger)) 126 { 127 logger.warn("The Runnable '{}' of the Schedulable '{}' cannot be executed now because concurrent execution is not allowed for this Schedulable", runnableId, schedulableId); 128 return; 129 } 130 131 boolean success = true; 132 133 // Set the previous (which is actually the current) fire time in the map (because the trigger may no longer exist in the future) 134 jobDataMap.put(KEY_PREVIOUS_FIRE_TIME, context.getTrigger().getPreviousFireTime().getTime()); // possible with @PersistJobDataAfterExecution annotation 135 136 Map<String, Object> environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_serviceManager, _environmentContext, new SLF4JLoggerAdapter(logger)); 137 138 String userIdentityAsString = jobDataMap.getString(Scheduler.KEY_RUNNABLE_USERIDENTITY); 139 if (!_setUserInSession(environmentInformation, userIdentityAsString)) 140 { 141 success = false; 142 String message = String.format("The UserIdentity '%s' defined for the execution of the job '%s' is not valid", userIdentityAsString, schedulableId); 143 logger.error(message); 144 throw new JobExecutionException(message); 145 } 146 147 logger.info("Executing the Runnable '{}' of the Schedulable '{}' with jobDataMap:\n '{}'", runnableId, schedulableId, jobDataMap.getWrappedMap().toString()); 148 Instant start = Instant.now(); 149 try 150 { 151 schedulable.execute(context); 152 } 153 catch (Exception e) 154 { 155 success = false; 156 String msg = String.format("An exception occurred during the execution of the Schedulable '%s'", schedulableId); 157 logger.error(msg, e); 158 throw new JobExecutionException(msg, e); 159 } 160 catch (Throwable t) 161 { 162 success = false; 163 logger.error(String.format("An error occurred during the execution of the Schedulable '%s'", schedulableId), t); 164 throw t; 165 } 166 finally 167 { 168 // Set the duration in the map 169 Instant end = Instant.now(); 170 long duration = Duration.between(start, end).toMillis(); 171 jobDataMap.put(KEY_LAST_DURATION, duration); // possible with @PersistJobDataAfterExecution annotation 172 173 BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation); 174 175 // Success ? 176 jobDataMap.put(KEY_SUCCESS, success); 177 178 // Run at startup tasks are one-shot tasks => if so, indicates it is completed for never refiring it 179 if (FireProcess.STARTUP.toString().equals(jobDataMap.getString(Scheduler.KEY_RUNNABLE_FIRE_PROCESS))) 180 { 181 jobDataMap.put(Scheduler.KEY_RUNNABLE_STARTUP_COMPLETED, true); 182 } 183 } 184 } 185 186 private boolean _setUserInSession(Map<String, Object> environmentInformation, String userIdentityAsString) 187 { 188 // For legacy purpose, we set the empty user to the system user 189 UserIdentity userIdentity = Optional.ofNullable(userIdentityAsString) 190 .map(UserIdentity::stringToUserIdentity) 191 .orElse(UserPopulationDAO.SYSTEM_USER_IDENTITY); 192 193 // Invalid user identity 194 if (_userManager.getUser(userIdentity) == null) 195 { 196 return false; 197 } 198 199 BackgroundEnvironment bgEnv = (BackgroundEnvironment) environmentInformation.get("environment"); 200 Request request = ObjectModelHelper.getRequest(bgEnv.getObjectModel()); 201 202 AuthenticateAction.setUserIdentityInSession(request, userIdentity, new UserDAO.ImpersonateCredentialProvider(), true); 203 204 return true; 205 } 206 207 private boolean _checkConcurrency(String schedulableId, String runnableId, JobKey currentJobKey, Logger logger) throws JobExecutionException 208 { 209 try 210 { 211 return _scheduler.getScheduler().getCurrentlyExecutingJobs().stream() 212 .map(JobExecutionContext::getJobDetail) 213 .filter(detail -> !detail.getKey().equals(currentJobKey)) // currently executing without this 214 .map(detail -> detail.getJobDataMap().getString(Scheduler.KEY_SCHEDULABLE_ID)) 215 .filter(id -> id.equals(schedulableId)) 216 .findFirst() 217 .isPresent(); 218 } 219 catch (SchedulerException e) 220 { 221 logger.error(String.format("An error occured during the concurrency check of the Runnable '%s'", runnableId), e); 222 throw new JobExecutionException(String.format("An error occured during the concurrency check of the Runnable '%s'", runnableId), e); 223 } 224 } 225}