001/* 002 * Copyright 2016 Anyware Services 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.ametys.core.schedule; 017 018import java.time.Duration; 019import java.time.Instant; 020import java.util.Map; 021import java.util.Optional; 022 023import org.apache.avalon.framework.context.Context; 024import org.apache.avalon.framework.context.ContextException; 025import org.apache.avalon.framework.service.ServiceException; 026import org.apache.avalon.framework.service.ServiceManager; 027import org.apache.cocoon.Constants; 028import org.apache.cocoon.environment.ObjectModelHelper; 029import org.apache.cocoon.environment.Request; 030import org.apache.cocoon.environment.background.BackgroundEnvironment; 031import org.apache.cocoon.util.log.SLF4JLoggerAdapter; 032import org.quartz.DisallowConcurrentExecution; 033import org.quartz.Job; 034import org.quartz.JobDataMap; 035import org.quartz.JobDetail; 036import org.quartz.JobExecutionContext; 037import org.quartz.JobExecutionException; 038import org.quartz.JobKey; 039import org.quartz.PersistJobDataAfterExecution; 040import org.quartz.SchedulerException; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044import org.ametys.core.authentication.AuthenticateAction; 045import org.ametys.core.engine.BackgroundEngineHelper; 046import org.ametys.core.schedule.Runnable.FireProcess; 047import org.ametys.core.user.UserIdentity; 048import org.ametys.core.user.UserManager; 049import org.ametys.core.user.population.UserPopulationDAO; 050import org.ametys.plugins.core.schedule.Scheduler; 051import org.ametys.plugins.core.user.UserDAO; 052 053/** 054 * Ametys implementation of a {@link Job} which delegates the execution of the task to the right {@link Schedulable} 055 */ 056@PersistJobDataAfterExecution 057@DisallowConcurrentExecution 058public class AmetysJob implements Job 059{ 060 /** The key for the last duration of the {@link #execute(org.quartz.JobExecutionContext)} method which is stored in the {@link JobDataMap} */ 061 public static final String KEY_LAST_DURATION = "duration"; 062 /** The key for the previous fire time of this job which is stored in the {@link JobDataMap} */ 063 public static final String KEY_PREVIOUS_FIRE_TIME = "previousFireTime"; 064 /** The key for the success state of the last execution of the job */ 065 public static final String KEY_SUCCESS = "success"; 066 067 /** The service manager */ 068 protected static ServiceManager _serviceManager; 069 /** The extension point for {@link Schedulable}s */ 070 protected static SchedulableExtensionPoint _schedulableEP; 071 /** The scheduler component */ 072 protected static Scheduler _scheduler; 073 /** The cocoon environment context. */ 074 protected static org.apache.cocoon.environment.Context _environmentContext; 075 /** The user manager component */ 076 protected static UserManager _userManager; 077 078 /** 079 * Initialize the static fields. 080 * @param serviceManager The service manager 081 * @param context The context 082 * @throws ServiceException if an error occurs during the lookup of the {@link SchedulableExtensionPoint} 083 * @throws ContextException if environment context object not found 084 */ 085 public static void initialize(ServiceManager serviceManager, Context context) throws ServiceException, ContextException 086 { 087 _serviceManager = serviceManager; 088 _schedulableEP = (SchedulableExtensionPoint) serviceManager.lookup(SchedulableExtensionPoint.ROLE); 089 _scheduler = (Scheduler) serviceManager.lookup(Scheduler.ROLE); 090 _environmentContext = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT); 091 _userManager = (UserManager) serviceManager.lookup(UserManager.ROLE); 092 } 093 094 /** 095 * Releases and destroys used resources 096 */ 097 public static void dispose() 098 { 099 _serviceManager = null; 100 _schedulableEP = null; 101 _scheduler = null; 102 _environmentContext = null; 103 _userManager = null; 104 } 105 106 @Override 107 public void execute(JobExecutionContext context) throws JobExecutionException 108 { 109 JobDetail detail = context.getJobDetail(); 110 JobDataMap jobDataMap = detail.getJobDataMap(); 111 JobKey jobKey = detail.getKey(); 112 String runnableId = jobDataMap.getString(Scheduler.KEY_RUNNABLE_ID); 113 String schedulableId = jobDataMap.getString(Scheduler.KEY_SCHEDULABLE_ID); 114 Schedulable schedulable = _schedulableEP.getExtension(schedulableId); 115 Logger logger = LoggerFactory.getLogger(AmetysJob.class.getName() + "$" + schedulableId); 116 117 // Concurrency allowed ? 118 if (!schedulable.acceptConcurrentExecution() && _checkConcurrency(schedulableId, runnableId, jobKey, logger)) 119 { 120 logger.warn("The Runnable '{}' of the Schedulable '{}' cannot be executed now because concurrent execution is not allowed for this Schedulable", runnableId, schedulableId); 121 return; 122 } 123 124 boolean success = true; 125 126 // Set the previous (which is actually the current) fire time in the map (because the trigger may no longer exist in the future) 127 jobDataMap.put(KEY_PREVIOUS_FIRE_TIME, context.getTrigger().getPreviousFireTime().getTime()); // possible with @PersistJobDataAfterExecution annotation 128 129 Map<String, Object> environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_serviceManager, _environmentContext, new SLF4JLoggerAdapter(logger)); 130 131 String userIdentityAsString = jobDataMap.getString(Scheduler.KEY_RUNNABLE_USERIDENTITY); 132 if (!_setUserInSession(environmentInformation, userIdentityAsString)) 133 { 134 success = false; 135 String message = String.format("The UserIdentity '%s' defined for the execution of the job '%s' is not valid", userIdentityAsString, schedulableId); 136 logger.error(message); 137 throw new JobExecutionException(message); 138 } 139 140 logger.info("Executing the Runnable '{}' of the Schedulable '{}' with jobDataMap:\n '{}'", runnableId, schedulableId, jobDataMap.getWrappedMap().toString()); 141 Instant start = Instant.now(); 142 try 143 { 144 schedulable.execute(context); 145 } 146 catch (Exception e) 147 { 148 success = false; 149 logger.error(String.format("An exception occured during the execution of the Schedulable '%s'", schedulableId), e); 150 throw new JobExecutionException(String.format("An exception occured during the execution of the job '%s'", schedulableId), e); 151 } 152 catch (Throwable t) 153 { 154 success = false; 155 logger.error(String.format("An error occured during the execution of the Schedulable '%s'", schedulableId), t); 156 throw t; 157 } 158 finally 159 { 160 // Set the duration in the map 161 Instant end = Instant.now(); 162 long duration = Duration.between(start, end).toMillis(); 163 jobDataMap.put(KEY_LAST_DURATION, duration); // possible with @PersistJobDataAfterExecution annotation 164 165 // Leave the Engine Environment 166 if (environmentInformation != null) 167 { 168 BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation); 169 } 170 171 // Success ? 172 jobDataMap.put(KEY_SUCCESS, success); 173 174 // Run at startup tasks are one-shot tasks => if so, indicates it is completed for never refiring it 175 if (FireProcess.STARTUP.toString().equals(jobDataMap.getString(Scheduler.KEY_RUNNABLE_FIRE_PROCESS))) 176 { 177 jobDataMap.put(Scheduler.KEY_RUNNABLE_STARTUP_COMPLETED, true); 178 } 179 } 180 } 181 182 private boolean _setUserInSession(Map<String, Object> environmentInformation, String userIdentityAsString) 183 { 184 // We set the empty user to the system user 185 UserIdentity userIdentity = Optional.ofNullable(userIdentityAsString) 186 .map(UserIdentity::stringToUserIdentity) 187 .orElse(UserPopulationDAO.SYSTEM_USER_IDENTITY); 188 189 // Invalid user identity 190 if (_userManager.getUser(userIdentity) == null) 191 { 192 return false; 193 } 194 195 BackgroundEnvironment bgEnv = (BackgroundEnvironment) environmentInformation.get("environment"); 196 Request request = ObjectModelHelper.getRequest(bgEnv.getObjectModel()); 197 198 AuthenticateAction.setUserIdentityInSession(request, userIdentity, new UserDAO.ImpersonateCredentialProvider(), true); 199 200 return true; 201 } 202 203 private boolean _checkConcurrency(String schedulableId, String runnableId, JobKey currentJobKey, Logger logger) throws JobExecutionException 204 { 205 try 206 { 207 return _scheduler.getScheduler().getCurrentlyExecutingJobs().stream() 208 .map(JobExecutionContext::getJobDetail) 209 .filter(detail -> !detail.getKey().equals(currentJobKey)) // currently executing without this 210 .map(detail -> detail.getJobDataMap().getString(Scheduler.KEY_SCHEDULABLE_ID)) 211 .filter(id -> id.equals(schedulableId)) 212 .findFirst() 213 .isPresent(); 214 } 215 catch (SchedulerException e) 216 { 217 logger.error(String.format("An error occured during the concurrency check of the Runnable '%s'", runnableId), e); 218 throw new JobExecutionException(String.format("An error occured during the concurrency check of the Runnable '%s'", runnableId), e); 219 } 220 } 221}