001/*
002 *  Copyright 2016 Anyware Services
003 *
004 *  Licensed under the Apache License, Version 2.0 (the "License");
005 *  you may not use this file except in compliance with the License.
006 *  You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 *  Unless required by applicable law or agreed to in writing, software
011 *  distributed under the License is distributed on an "AS IS" BASIS,
012 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 *  See the License for the specific language governing permissions and
014 *  limitations under the License.
015 */
016package org.ametys.core.schedule;
017
018import java.time.Duration;
019import java.time.Instant;
020import java.util.Map;
021import java.util.Optional;
022
023import org.apache.avalon.framework.context.Context;
024import org.apache.avalon.framework.context.ContextException;
025import org.apache.avalon.framework.service.ServiceException;
026import org.apache.avalon.framework.service.ServiceManager;
027import org.apache.cocoon.Constants;
028import org.apache.cocoon.environment.ObjectModelHelper;
029import org.apache.cocoon.environment.Request;
030import org.apache.cocoon.environment.background.BackgroundEnvironment;
031import org.apache.cocoon.util.log.SLF4JLoggerAdapter;
032import org.quartz.DisallowConcurrentExecution;
033import org.quartz.Job;
034import org.quartz.JobDataMap;
035import org.quartz.JobDetail;
036import org.quartz.JobExecutionContext;
037import org.quartz.JobExecutionException;
038import org.quartz.JobKey;
039import org.quartz.PersistJobDataAfterExecution;
040import org.quartz.SchedulerException;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import org.ametys.core.authentication.AuthenticateAction;
045import org.ametys.core.engine.BackgroundEngineHelper;
046import org.ametys.core.schedule.Runnable.FireProcess;
047import org.ametys.core.user.UserIdentity;
048import org.ametys.core.user.UserManager;
049import org.ametys.core.user.population.UserPopulationDAO;
050import org.ametys.plugins.core.schedule.Scheduler;
051import org.ametys.plugins.core.user.UserDAO;
052
053/**
054 * Ametys implementation of a {@link Job} which delegates the execution of the task to the right {@link Schedulable}
055 */
056@PersistJobDataAfterExecution
057@DisallowConcurrentExecution
058public class AmetysJob implements Job
059{
060    /** The key for the last duration of the {@link #execute(org.quartz.JobExecutionContext)} method which is stored in the {@link JobDataMap} */
061    public static final String KEY_LAST_DURATION = "duration";
062    /** The key for the previous fire time of this job which is stored in the {@link JobDataMap} */
063    public static final String KEY_PREVIOUS_FIRE_TIME = "previousFireTime";
064    /** The key for the success state of the last execution of the job */
065    public static final String KEY_SUCCESS = "success";
066    
067    /** The service manager */
068    protected static ServiceManager _serviceManager;
069    /** The extension point for {@link Schedulable}s */
070    protected static SchedulableExtensionPoint _schedulableEP;
071    /** The scheduler component */
072    protected static Scheduler _scheduler;
073    /** The cocoon environment context. */
074    protected static org.apache.cocoon.environment.Context _environmentContext;
075    /** The user manager component */
076    protected static UserManager _userManager;
077    
078    /**
079     * Initialize the static fields.
080     * @param serviceManager The service manager
081     * @param context The context
082     * @throws ServiceException if an error occurs during the lookup of the {@link SchedulableExtensionPoint}
083     * @throws ContextException if environment context object not found
084     */
085    public static void initialize(ServiceManager serviceManager, Context context) throws ServiceException, ContextException
086    {
087        _serviceManager = serviceManager;
088        _schedulableEP = (SchedulableExtensionPoint) serviceManager.lookup(SchedulableExtensionPoint.ROLE);
089        _scheduler = (Scheduler) serviceManager.lookup(Scheduler.ROLE);
090        _environmentContext = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT);
091        _userManager = (UserManager) serviceManager.lookup(UserManager.ROLE);
092    }
093    
094    /**
095     * Releases and destroys used resources
096     */
097    public static void dispose()
098    {
099        _serviceManager = null;
100        _schedulableEP = null;
101        _scheduler = null;
102        _environmentContext = null;
103        _userManager = null;
104    }
105    
106    @Override
107    public void execute(JobExecutionContext context) throws JobExecutionException
108    {
109        JobDetail detail = context.getJobDetail();
110        JobDataMap jobDataMap = detail.getJobDataMap();
111        JobKey jobKey = detail.getKey();
112        String runnableId = jobDataMap.getString(Scheduler.KEY_RUNNABLE_ID);
113        String schedulableId = jobDataMap.getString(Scheduler.KEY_SCHEDULABLE_ID);
114        Schedulable schedulable = _schedulableEP.getExtension(schedulableId);
115        Logger logger = LoggerFactory.getLogger(AmetysJob.class.getName() + "$" + schedulableId);
116        
117        // Concurrency allowed ?
118        if (!schedulable.acceptConcurrentExecution() && _checkConcurrency(schedulableId, runnableId, jobKey, logger))
119        {
120            logger.warn("The Runnable '{}' of the Schedulable '{}' cannot be executed now because concurrent execution is not allowed for this Schedulable", runnableId, schedulableId);
121            return;
122        }
123        
124        boolean success = true;
125        
126        // Set the previous (which is actually the current) fire time in the map (because the trigger may no longer exist in the future)
127        jobDataMap.put(KEY_PREVIOUS_FIRE_TIME, context.getTrigger().getPreviousFireTime().getTime()); // possible with @PersistJobDataAfterExecution annotation
128        
129        Map<String, Object> environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_serviceManager, _environmentContext, new SLF4JLoggerAdapter(logger));
130        
131        String userIdentityAsString = jobDataMap.getString(Scheduler.KEY_RUNNABLE_USERIDENTITY);
132        if (!_setUserInSession(environmentInformation, userIdentityAsString))
133        {
134            success = false;
135            String message = String.format("The UserIdentity '%s' defined for the execution of the job '%s' is not valid", userIdentityAsString, schedulableId);
136            logger.error(message);
137            throw new JobExecutionException(message);
138        }
139        
140        logger.info("Executing the Runnable '{}' of the Schedulable '{}' with jobDataMap:\n '{}'", runnableId, schedulableId, jobDataMap.getWrappedMap().toString());
141        Instant start = Instant.now();
142        try
143        {
144            schedulable.execute(context);
145        }
146        catch (Exception e)
147        {
148            success = false;
149            logger.error(String.format("An exception occured during the execution of the Schedulable '%s'", schedulableId), e);
150            throw new JobExecutionException(String.format("An exception occured during the execution of the job '%s'", schedulableId), e);
151        }
152        catch (Throwable t)
153        {
154            success = false;
155            logger.error(String.format("An error occured during the execution of the Schedulable '%s'", schedulableId), t);
156            throw t;
157        }
158        finally
159        {
160            // Set the duration in the map
161            Instant end = Instant.now();
162            long duration = Duration.between(start, end).toMillis();
163            jobDataMap.put(KEY_LAST_DURATION, duration); // possible with @PersistJobDataAfterExecution annotation
164            
165            // Leave the Engine Environment
166            if (environmentInformation != null)
167            {
168                BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation);
169            }
170            
171            // Success ?
172            jobDataMap.put(KEY_SUCCESS, success);
173            
174            // Run at startup tasks are one-shot tasks => if so, indicates it is completed for never refiring it
175            if (FireProcess.STARTUP.toString().equals(jobDataMap.getString(Scheduler.KEY_RUNNABLE_FIRE_PROCESS)))
176            {
177                jobDataMap.put(Scheduler.KEY_RUNNABLE_STARTUP_COMPLETED, true);
178            }
179        }
180    }
181    
182    private boolean _setUserInSession(Map<String, Object> environmentInformation, String userIdentityAsString)
183    {
184        // We set the empty user to the system user
185        UserIdentity userIdentity = Optional.ofNullable(userIdentityAsString)
186                .map(UserIdentity::stringToUserIdentity)
187                .orElse(UserPopulationDAO.SYSTEM_USER_IDENTITY);
188        
189        // Invalid user identity
190        if (_userManager.getUser(userIdentity) == null)
191        {
192            return false;
193        }
194        
195        BackgroundEnvironment bgEnv = (BackgroundEnvironment) environmentInformation.get("environment");
196        Request request = ObjectModelHelper.getRequest(bgEnv.getObjectModel());
197
198        AuthenticateAction.setUserIdentityInSession(request, userIdentity, new UserDAO.ImpersonateCredentialProvider(), true);
199        
200        return true;
201    }
202    
203    private boolean _checkConcurrency(String schedulableId, String runnableId, JobKey currentJobKey, Logger logger) throws JobExecutionException
204    {
205        try
206        {
207            return _scheduler.getScheduler().getCurrentlyExecutingJobs().stream()
208                    .map(JobExecutionContext::getJobDetail)
209                    .filter(detail -> !detail.getKey().equals(currentJobKey)) // currently executing without this
210                    .map(detail -> detail.getJobDataMap().getString(Scheduler.KEY_SCHEDULABLE_ID))
211                    .filter(id -> id.equals(schedulableId))
212                    .findFirst()
213                    .isPresent();
214        }
215        catch (SchedulerException e)
216        {
217            logger.error(String.format("An error occured during the concurrency check of the Runnable '%s'", runnableId), e);
218            throw new JobExecutionException(String.format("An error occured during the concurrency check of the Runnable '%s'", runnableId), e);
219        }
220    }
221}