001/*
002 *  Copyright 2016 Anyware Services
003 *
004 *  Licensed under the Apache License, Version 2.0 (the "License");
005 *  you may not use this file except in compliance with the License.
006 *  You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 *  Unless required by applicable law or agreed to in writing, software
011 *  distributed under the License is distributed on an "AS IS" BASIS,
012 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 *  See the License for the specific language governing permissions and
014 *  limitations under the License.
015 */
016package org.ametys.core.schedule;
017
018import java.time.Duration;
019import java.time.Instant;
020import java.util.Map;
021import java.util.Optional;
022
023import org.apache.avalon.framework.context.Context;
024import org.apache.avalon.framework.context.ContextException;
025import org.apache.avalon.framework.service.ServiceException;
026import org.apache.avalon.framework.service.ServiceManager;
027import org.apache.cocoon.Constants;
028import org.apache.cocoon.environment.ObjectModelHelper;
029import org.apache.cocoon.environment.Request;
030import org.apache.cocoon.util.log.SLF4JLoggerAdapter;
031import org.quartz.DisallowConcurrentExecution;
032import org.quartz.Job;
033import org.quartz.JobDataMap;
034import org.quartz.JobDetail;
035import org.quartz.JobExecutionContext;
036import org.quartz.JobExecutionException;
037import org.quartz.JobKey;
038import org.quartz.PersistJobDataAfterExecution;
039import org.quartz.SchedulerException;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import org.ametys.core.authentication.AuthenticateAction;
044import org.ametys.core.engine.BackgroundEngineHelper;
045import org.ametys.core.engine.BackgroundEnvironment;
046import org.ametys.core.schedule.Runnable.FireProcess;
047import org.ametys.core.schedule.progression.ContainerProgressionTracker;
048import org.ametys.core.schedule.progression.ProgressionTrackerFactory;
049import org.ametys.core.user.UserIdentity;
050import org.ametys.core.user.UserManager;
051import org.ametys.core.user.population.UserPopulationDAO;
052import org.ametys.plugins.core.impl.schedule.AbstractStaticSchedulable;
053import org.ametys.plugins.core.schedule.Scheduler;
054import org.ametys.plugins.core.user.UserDAO;
055import org.ametys.runtime.servlet.RuntimeServlet;
056import org.ametys.runtime.servlet.RuntimeServlet.RunMode;
057
058/**
059 * Ametys implementation of a {@link Job} which delegates the execution of the task to the right {@link Schedulable}
060 */
061@PersistJobDataAfterExecution
062@DisallowConcurrentExecution
063public class AmetysJob implements Job
064{
065    /** The key for the last duration of the {@link #execute(org.quartz.JobExecutionContext)} method which is stored in the {@link JobDataMap} */
066    public static final String KEY_LAST_DURATION = "duration";
067    /** The key for the previous fire time of this job which is stored in the {@link JobDataMap} */
068    public static final String KEY_PREVIOUS_FIRE_TIME = "previousFireTime";
069    /** The key for the success state of the last execution of the job */
070    public static final String KEY_SUCCESS = "success";
071    
072    /** The service manager */
073    protected static ServiceManager _serviceManager;
074    /** The extension point for {@link Schedulable}s */
075    protected static SchedulableExtensionPoint _schedulableEP;
076    /** The scheduler component */
077    protected static Scheduler _scheduler;
078    /** The cocoon environment context. */
079    protected static org.apache.cocoon.environment.Context _environmentContext;
080    /** The user manager component */
081    protected static UserManager _userManager;
082    
083    /**
084     * Initialize the static fields.
085     * @param serviceManager The service manager
086     * @param context The context
087     * @throws ServiceException if an error occurs during the lookup of the {@link SchedulableExtensionPoint}
088     * @throws ContextException if environment context object not found
089     */
090    public static void initialize(ServiceManager serviceManager, Context context) throws ServiceException, ContextException
091    {
092        _serviceManager = serviceManager;
093        _schedulableEP = (SchedulableExtensionPoint) serviceManager.lookup(SchedulableExtensionPoint.ROLE);
094        _scheduler = (Scheduler) serviceManager.lookup(Scheduler.ROLE);
095        _environmentContext = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT);
096        _userManager = (UserManager) serviceManager.lookup(UserManager.ROLE);
097    }
098    
099    /**
100     * Releases and destroys used resources
101     */
102    public static void dispose()
103    {
104        _serviceManager = null;
105        _schedulableEP = null;
106        _scheduler = null;
107        _environmentContext = null;
108        _userManager = null;
109    }
110    
111    @Override
112    public void execute(JobExecutionContext context) throws JobExecutionException
113    {
114        if (RuntimeServlet.getRunMode().equals(RunMode.STOPPING))
115        {
116            throw new JobExecutionException("Application is currently stopping. Impossible to run a job.");
117        }
118        
119        JobDetail detail = context.getJobDetail();
120        JobDataMap jobDataMap = detail.getJobDataMap();
121        JobKey jobKey = detail.getKey();
122        String runnableId = jobDataMap.getString(Scheduler.KEY_RUNNABLE_ID);
123        String schedulableId = jobDataMap.getString(Scheduler.KEY_SCHEDULABLE_ID);
124        Schedulable schedulable = _schedulableEP.getExtension(schedulableId);
125        Logger logger = LoggerFactory.getLogger(AmetysJob.class.getName() + "$" + schedulableId);
126        
127        // Concurrency allowed ?
128        if (!schedulable.acceptConcurrentExecution() && _checkConcurrency(schedulableId, runnableId, jobKey, logger))
129        {
130            logger.warn("The Runnable '{}' of the Schedulable '{}' cannot be executed now because concurrent execution is not allowed for this Schedulable", runnableId, schedulableId);
131            return;
132        }
133        
134        boolean success = true;
135        
136        // Set the previous (which is actually the current) fire time in the map (because the trigger may no longer exist in the future)
137        jobDataMap.put(KEY_PREVIOUS_FIRE_TIME, context.getTrigger().getPreviousFireTime().getTime()); // possible with @PersistJobDataAfterExecution annotation
138        
139        Map<String, Object> environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_serviceManager, _environmentContext, new SLF4JLoggerAdapter(logger));
140        
141        String userIdentityAsString = jobDataMap.getString(Scheduler.KEY_RUNNABLE_USERIDENTITY);
142        if (!_setUserInSession(environmentInformation, userIdentityAsString))
143        {
144            success = false;
145            String message = String.format("The UserIdentity '%s' defined for the execution of the job '%s' is not valid", userIdentityAsString, schedulableId);
146            logger.error(message);
147            throw new JobExecutionException(message);
148        }
149        
150        logger.info("Executing the Runnable '{}' of the Schedulable '{}' with jobDataMap:\n '{}'", runnableId, schedulableId, jobDataMap.getWrappedMap().toString());
151        Instant start = Instant.now();
152        try
153        {
154            ContainerProgressionTracker progressionTracker = ProgressionTrackerFactory.createContainerProgressionTracker(schedulable.getLabel(), logger);
155            context.put(AbstractStaticSchedulable.PROGRESSION_TRACKER, progressionTracker);
156            
157            schedulable.execute(context, progressionTracker);
158        }
159        catch (Exception e)
160        {
161            success = false;
162            String msg = String.format("An exception occurred during the execution of the Schedulable '%s'", schedulableId);
163            logger.error(msg, e);
164            throw new JobExecutionException(msg, e);
165        }
166        catch (Throwable t)
167        {
168            success = false;
169            logger.error(String.format("An error occurred during the execution of the Schedulable '%s'", schedulableId), t);
170            throw t;
171        }
172        finally
173        {
174            // Set the duration in the map
175            Instant end = Instant.now();
176            long duration = Duration.between(start, end).toMillis();
177            jobDataMap.put(KEY_LAST_DURATION, duration); // possible with @PersistJobDataAfterExecution annotation
178            
179            BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation);
180            
181            // Success ?
182            jobDataMap.put(KEY_SUCCESS, success);
183            
184            // Run at startup tasks are one-shot tasks => if so, indicates it is completed for never refiring it
185            if (FireProcess.STARTUP.toString().equals(jobDataMap.getString(Scheduler.KEY_RUNNABLE_FIRE_PROCESS)))
186            {
187                jobDataMap.put(Scheduler.KEY_RUNNABLE_STARTUP_COMPLETED, true);
188            }
189        }
190    }
191    
192    private boolean _setUserInSession(Map<String, Object> environmentInformation, String userIdentityAsString)
193    {
194        // For legacy purpose, we set the empty user to the system user
195        UserIdentity userIdentity = Optional.ofNullable(userIdentityAsString)
196                .map(UserIdentity::stringToUserIdentity)
197                .orElse(UserPopulationDAO.SYSTEM_USER_IDENTITY);
198        
199        // Invalid user identity
200        if (_userManager.getUser(userIdentity) == null)
201        {
202            return false;
203        }
204        
205        BackgroundEnvironment bgEnv = (BackgroundEnvironment) environmentInformation.get("environment");
206        Request request = ObjectModelHelper.getRequest(bgEnv.getObjectModel());
207
208        AuthenticateAction.setUserIdentityInSession(request, userIdentity, new UserDAO.ImpersonateCredentialProvider(), true);
209        
210        return true;
211    }
212    
213    private boolean _checkConcurrency(String schedulableId, String runnableId, JobKey currentJobKey, Logger logger) throws JobExecutionException
214    {
215        try
216        {
217            return _scheduler.getScheduler().getCurrentlyExecutingJobs().stream()
218                    .map(JobExecutionContext::getJobDetail)
219                    .filter(detail -> !detail.getKey().equals(currentJobKey)) // currently executing without this
220                    .map(detail -> detail.getJobDataMap().getString(Scheduler.KEY_SCHEDULABLE_ID))
221                    .filter(id -> id.equals(schedulableId))
222                    .findFirst()
223                    .isPresent();
224        }
225        catch (SchedulerException e)
226        {
227            logger.error(String.format("An error occured during the concurrency check of the Runnable '%s'", runnableId), e);
228            throw new JobExecutionException(String.format("An error occured during the concurrency check of the Runnable '%s'", runnableId), e);
229        }
230    }
231}