001/*
002 *  Copyright 2016 Anyware Services
003 *
004 *  Licensed under the Apache License, Version 2.0 (the "License");
005 *  you may not use this file except in compliance with the License.
006 *  You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 *  Unless required by applicable law or agreed to in writing, software
011 *  distributed under the License is distributed on an "AS IS" BASIS,
012 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 *  See the License for the specific language governing permissions and
014 *  limitations under the License.
015 */
016package org.ametys.core.schedule;
017
018import java.time.Duration;
019import java.time.Instant;
020import java.util.Map;
021import java.util.Optional;
022
023import org.apache.avalon.framework.context.Context;
024import org.apache.avalon.framework.context.ContextException;
025import org.apache.avalon.framework.service.ServiceException;
026import org.apache.avalon.framework.service.ServiceManager;
027import org.apache.cocoon.Constants;
028import org.apache.cocoon.environment.ObjectModelHelper;
029import org.apache.cocoon.environment.Request;
030import org.apache.cocoon.environment.background.BackgroundEnvironment;
031import org.apache.cocoon.util.log.SLF4JLoggerAdapter;
032import org.quartz.DisallowConcurrentExecution;
033import org.quartz.Job;
034import org.quartz.JobDataMap;
035import org.quartz.JobDetail;
036import org.quartz.JobExecutionContext;
037import org.quartz.JobExecutionException;
038import org.quartz.JobKey;
039import org.quartz.PersistJobDataAfterExecution;
040import org.quartz.SchedulerException;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.ametys.core.authentication.AuthenticateAction;
045import org.ametys.core.engine.BackgroundEngineHelper;
046import org.ametys.core.schedule.Runnable.FireProcess;
047import org.ametys.core.user.UserIdentity;
048import org.ametys.core.user.UserManager;
049import org.ametys.core.user.population.UserPopulationDAO;
050import org.ametys.plugins.core.schedule.Scheduler;
051import org.ametys.plugins.core.user.UserDAO;
052import org.ametys.runtime.servlet.RuntimeServlet;
053import org.ametys.runtime.servlet.RuntimeServlet.RunMode;
054
055/**
056 * Ametys implementation of a {@link Job} which delegates the execution of the task to the right {@link Schedulable}
057 */
058@PersistJobDataAfterExecution
059@DisallowConcurrentExecution
060public class AmetysJob implements Job
061{
062    /** The key for the last duration of the {@link #execute(org.quartz.JobExecutionContext)} method which is stored in the {@link JobDataMap} */
063    public static final String KEY_LAST_DURATION = "duration";
064    /** The key for the previous fire time of this job which is stored in the {@link JobDataMap} */
065    public static final String KEY_PREVIOUS_FIRE_TIME = "previousFireTime";
066    /** The key for the success state of the last execution of the job */
067    public static final String KEY_SUCCESS = "success";
068    
069    /** The service manager */
070    protected static ServiceManager _serviceManager;
071    /** The extension point for {@link Schedulable}s */
072    protected static SchedulableExtensionPoint _schedulableEP;
073    /** The scheduler component */
074    protected static Scheduler _scheduler;
075    /** The cocoon environment context. */
076    protected static org.apache.cocoon.environment.Context _environmentContext;
077    /** The user manager component */
078    protected static UserManager _userManager;
079    
080    /**
081     * Initialize the static fields.
082     * @param serviceManager The service manager
083     * @param context The context
084     * @throws ServiceException if an error occurs during the lookup of the {@link SchedulableExtensionPoint}
085     * @throws ContextException if environment context object not found
086     */
087    public static void initialize(ServiceManager serviceManager, Context context) throws ServiceException, ContextException
088    {
089        _serviceManager = serviceManager;
090        _schedulableEP = (SchedulableExtensionPoint) serviceManager.lookup(SchedulableExtensionPoint.ROLE);
091        _scheduler = (Scheduler) serviceManager.lookup(Scheduler.ROLE);
092        _environmentContext = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT);
093        _userManager = (UserManager) serviceManager.lookup(UserManager.ROLE);
094    }
095    
096    /**
097     * Releases and destroys used resources
098     */
099    public static void dispose()
100    {
101        _serviceManager = null;
102        _schedulableEP = null;
103        _scheduler = null;
104        _environmentContext = null;
105        _userManager = null;
106    }
107    
108    @Override
109    public void execute(JobExecutionContext context) throws JobExecutionException
110    {
111        if (RuntimeServlet.getRunMode().equals(RunMode.STOPPING))
112        {
113            throw new JobExecutionException("Application is currently stopping. Impossible to run a job.");
114        }
115        
116        JobDetail detail = context.getJobDetail();
117        JobDataMap jobDataMap = detail.getJobDataMap();
118        JobKey jobKey = detail.getKey();
119        String runnableId = jobDataMap.getString(Scheduler.KEY_RUNNABLE_ID);
120        String schedulableId = jobDataMap.getString(Scheduler.KEY_SCHEDULABLE_ID);
121        Schedulable schedulable = _schedulableEP.getExtension(schedulableId);
122        Logger logger = LoggerFactory.getLogger(AmetysJob.class.getName() + "$" + schedulableId);
123        
124        // Concurrency allowed ?
125        if (!schedulable.acceptConcurrentExecution() && _checkConcurrency(schedulableId, runnableId, jobKey, logger))
126        {
127            logger.warn("The Runnable '{}' of the Schedulable '{}' cannot be executed now because concurrent execution is not allowed for this Schedulable", runnableId, schedulableId);
128            return;
129        }
130        
131        boolean success = true;
132        
133        // Set the previous (which is actually the current) fire time in the map (because the trigger may no longer exist in the future)
134        jobDataMap.put(KEY_PREVIOUS_FIRE_TIME, context.getTrigger().getPreviousFireTime().getTime()); // possible with @PersistJobDataAfterExecution annotation
135        
136        Map<String, Object> environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_serviceManager, _environmentContext, new SLF4JLoggerAdapter(logger));
137        
138        String userIdentityAsString = jobDataMap.getString(Scheduler.KEY_RUNNABLE_USERIDENTITY);
139        if (!_setUserInSession(environmentInformation, userIdentityAsString))
140        {
141            success = false;
142            String message = String.format("The UserIdentity '%s' defined for the execution of the job '%s' is not valid", userIdentityAsString, schedulableId);
143            logger.error(message);
144            throw new JobExecutionException(message);
145        }
146        
147        logger.info("Executing the Runnable '{}' of the Schedulable '{}' with jobDataMap:\n '{}'", runnableId, schedulableId, jobDataMap.getWrappedMap().toString());
148        Instant start = Instant.now();
149        try
150        {
151            schedulable.execute(context);
152        }
153        catch (Exception e)
154        {
155            success = false;
156            logger.error(String.format("An exception occured during the execution of the Schedulable '%s'", schedulableId), e);
157            throw new JobExecutionException(String.format("An exception occured during the execution of the job '%s'", schedulableId), e);
158        }
159        catch (Throwable t)
160        {
161            success = false;
162            logger.error(String.format("An error occured during the execution of the Schedulable '%s'", schedulableId), t);
163            throw t;
164        }
165        finally
166        {
167            // Set the duration in the map
168            Instant end = Instant.now();
169            long duration = Duration.between(start, end).toMillis();
170            jobDataMap.put(KEY_LAST_DURATION, duration); // possible with @PersistJobDataAfterExecution annotation
171            
172            // Leave the Engine Environment
173            if (environmentInformation != null)
174            {
175                BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation);
176            }
177            
178            // Success ?
179            jobDataMap.put(KEY_SUCCESS, success);
180            
181            // Run at startup tasks are one-shot tasks => if so, indicates it is completed for never refiring it
182            if (FireProcess.STARTUP.toString().equals(jobDataMap.getString(Scheduler.KEY_RUNNABLE_FIRE_PROCESS)))
183            {
184                jobDataMap.put(Scheduler.KEY_RUNNABLE_STARTUP_COMPLETED, true);
185            }
186        }
187    }
188    
189    private boolean _setUserInSession(Map<String, Object> environmentInformation, String userIdentityAsString)
190    {
191        // We set the empty user to the system user
192        UserIdentity userIdentity = Optional.ofNullable(userIdentityAsString)
193                .map(UserIdentity::stringToUserIdentity)
194                .orElse(UserPopulationDAO.SYSTEM_USER_IDENTITY);
195        
196        // Invalid user identity
197        if (_userManager.getUser(userIdentity) == null)
198        {
199            return false;
200        }
201        
202        BackgroundEnvironment bgEnv = (BackgroundEnvironment) environmentInformation.get("environment");
203        Request request = ObjectModelHelper.getRequest(bgEnv.getObjectModel());
204
205        AuthenticateAction.setUserIdentityInSession(request, userIdentity, new UserDAO.ImpersonateCredentialProvider(), true);
206        
207        return true;
208    }
209    
210    private boolean _checkConcurrency(String schedulableId, String runnableId, JobKey currentJobKey, Logger logger) throws JobExecutionException
211    {
212        try
213        {
214            return _scheduler.getScheduler().getCurrentlyExecutingJobs().stream()
215                    .map(JobExecutionContext::getJobDetail)
216                    .filter(detail -> !detail.getKey().equals(currentJobKey)) // currently executing without this
217                    .map(detail -> detail.getJobDataMap().getString(Scheduler.KEY_SCHEDULABLE_ID))
218                    .filter(id -> id.equals(schedulableId))
219                    .findFirst()
220                    .isPresent();
221        }
222        catch (SchedulerException e)
223        {
224            logger.error(String.format("An error occured during the concurrency check of the Runnable '%s'", runnableId), e);
225            throw new JobExecutionException(String.format("An error occured during the concurrency check of the Runnable '%s'", runnableId), e);
226        }
227    }
228}