001/*
002 *  Copyright 2013 Anyware Services
003 *
004 *  Licensed under the Apache License, Version 2.0 (the "License");
005 *  you may not use this file except in compliance with the License.
006 *  You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 *  Unless required by applicable law or agreed to in writing, software
011 *  distributed under the License is distributed on an "AS IS" BASIS,
012 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 *  See the License for the specific language governing permissions and
014 *  limitations under the License.
015 */
016package org.ametys.core.observation;
017
018import java.util.ArrayList;
019import java.util.Collection;
020import java.util.Collections;
021import java.util.Comparator;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.Optional;
026import java.util.concurrent.Callable;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.Executors;
029import java.util.concurrent.Future;
030import java.util.concurrent.ThreadFactory;
031import java.util.concurrent.atomic.AtomicLong;
032
033import org.apache.avalon.framework.CascadingRuntimeException;
034import org.apache.avalon.framework.activity.Disposable;
035import org.apache.avalon.framework.component.Component;
036import org.apache.avalon.framework.context.ContextException;
037import org.apache.avalon.framework.context.Contextualizable;
038import org.apache.avalon.framework.logger.AbstractLogEnabled;
039import org.apache.avalon.framework.service.ServiceException;
040import org.apache.avalon.framework.service.ServiceManager;
041import org.apache.avalon.framework.service.Serviceable;
042import org.apache.cocoon.Constants;
043import org.apache.cocoon.components.ContextHelper;
044import org.apache.cocoon.environment.Context;
045import org.apache.cocoon.environment.ObjectModelHelper;
046import org.apache.cocoon.environment.Request;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050import org.ametys.core.authentication.AuthenticateAction;
051import org.ametys.core.engine.BackgroundEngineHelper;
052import org.ametys.core.engine.BackgroundEnvironment;
053import org.ametys.core.user.UserIdentity;
054import org.ametys.core.user.population.UserPopulationDAO;
055import org.ametys.plugins.core.user.UserDAO;
056
057/**
058 * Manager for dispatching {@link Event} instances to {@link Observer}s.
059 */
060public class ObservationManager extends AbstractLogEnabled implements Component, Serviceable, Contextualizable, Disposable
061{
062    /** Avalon ROLE. */
063    public static final String ROLE = ObservationManager.class.getName();
064    
065    private static final String __CURRENT_FUTURES_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$currentFutures";
066    private static final String __ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$.additional.args";
067    private static final Logger __ALL_EVENTS_LOGGER = LoggerFactory.getLogger("org.ametys.cms.observation.AllEvents");
068    
069    /**
070     * The executor service managing the single thread pool. This threads is
071     * used to run non-parallelizable observers.
072     */
073    private static ExecutorService __SINGLE_THREAD_EXECUTOR;
074    
075    /**
076     * The executor service managing the thread pool of asynchronous observers
077     * allowed to run in parallel.
078     */
079    private static ExecutorService __PARALLEL_THREAD_EXECUTOR;
080    
081    /** Cocoon context */
082    protected Context _context;
083    
084    /** Service manager */
085    protected ServiceManager _manager;
086    
087    private org.apache.avalon.framework.context.Context _avalonContext;
088    private ObserverExtensionPoint _observerExtPt;
089    private Collection<Observer> _registeredObservers = new ArrayList<>();
090    
091    @Override
092    public void service(ServiceManager manager) throws ServiceException
093    {
094        _manager = manager;
095        _observerExtPt  = (ObserverExtensionPoint) manager.lookup(ObserverExtensionPoint.ROLE);
096    }
097    
098    @Override
099    public void contextualize(org.apache.avalon.framework.context.Context context) throws ContextException
100    {
101        _avalonContext = context;
102        _context = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT);
103    }
104    
105    /**
106     * Notify of a event which will be dispatch to registered
107     * observers.
108     * @param event the event to notify.
109     * @return The {@link Future} objects of the asynchronous observers
110     */
111    public List<Future> notify(final Event event)
112    {
113        List<Future> result = new ArrayList<>();
114        
115        try
116        {
117            if (getLogger().isDebugEnabled())
118            {
119                getLogger().debug("Receiving " + event);
120            }
121            
122            if (__ALL_EVENTS_LOGGER.isInfoEnabled())
123            {
124                __ALL_EVENTS_LOGGER.info("Receiving " + event);
125            }
126            
127            List<Observer> supportedObservers = new ArrayList<>();
128            
129            // Retrieve supported observers from extension point
130            for (String observerId : _observerExtPt.getExtensionsIds())
131            {
132                Observer observer = _observerExtPt.getExtension(observerId);
133                _addSupportedObserver(supportedObservers, observer, event);
134            }
135            
136            // Retrieve supported observers from manual registration
137            for (Observer observer : _registeredObservers)
138            {
139                _addSupportedObserver(supportedObservers, observer, event);
140            }
141            
142            // Order observers (0 is first, Integer.MAX_INT is last)
143            Collections.sort(supportedObservers, Comparator.comparingInt(
144                observer -> observer.getPriority(event)
145            ));
146            
147            // Observes the event and prepares the asynchronous observes.
148            _putAdditionalArgs(event);
149            _observesEvent(event, supportedObservers, result);
150        }
151        catch (Exception e)
152        {
153            // Observers should never fail, so just log the error
154            getLogger().error("Unable to dispatch event: " + event + " to observers", e);
155        }
156        
157        // Keep in current request the futures of the curent request
158        try
159        {
160            List<Future> futuresForCurrentRequest = _getFuturesForRequest();
161            futuresForCurrentRequest.addAll(result);
162        }
163        catch (CascadingRuntimeException e)
164        {
165            // can happen if no request
166            // do nothing though
167        }
168        
169        return result;
170    }
171    
172    private void _addSupportedObserver(List<Observer> supportedObservers, Observer observer, final Event event)
173    {
174        if (getLogger().isDebugEnabled())
175        {
176            getLogger().debug("Checking support for event: " + event + " and observer: " + observer);
177        }
178        
179        if (observer.supports(event))
180        {
181            if (getLogger().isDebugEnabled())
182            {
183                getLogger().debug("Event: " + event + " supported for observer: " + observer);
184            }
185            
186            supportedObservers.add(observer);
187        }
188    }
189    
190    /**
191     * Gets all the {@link Future}s of all the {@link AsyncObserver}s notified by the call to {@link ObservationManager#notify(Event)} during the current request.
192     * <br>This can be useful to call {@link Future#get()} in order to ensure all {@link AsyncObserver}s notified by the current request have finished to work.
193     * @return the {@link Future}s of the notified observers during the current request.
194     */
195    public List<Future> getFuturesForRequest()
196    {
197        try
198        {
199            return Collections.unmodifiableList(_getFuturesForRequest());
200        }
201        catch (CascadingRuntimeException e)
202        {
203            // can happen if no request
204            // return empty list
205            return Collections.emptyList();
206        }
207    }
208    
209    private List<Future> _getFuturesForRequest() throws CascadingRuntimeException
210    {
211        Request request = ContextHelper.getRequest(_avalonContext);
212        @SuppressWarnings("unchecked")
213        List<Future> futuresForCurrentRequest = (List<Future>) request.getAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME);
214        if (futuresForCurrentRequest == null)
215        {
216            futuresForCurrentRequest = new ArrayList<>();
217            request.setAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME, futuresForCurrentRequest);
218        }
219        return futuresForCurrentRequest;
220    }
221    
222    /**
223     * For all events of the given event id which will be notified during the current request, add an additional argument
224     * @param eventIds The event ids
225     * @param argName The name of the additional argument
226     * @param argValue The value of the additional argument
227     */
228    public void addArgumentForEvents(String[] eventIds, String argName, Object argValue)
229    {
230        Request request = ContextHelper.getRequest(_avalonContext);
231        
232        // Map eventId -> argName -> argValue
233        @SuppressWarnings("unchecked")
234        Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME);
235        if (additionalArgsForEvents == null)
236        {
237            additionalArgsForEvents = new HashMap<>();
238            request.setAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME, additionalArgsForEvents);
239        }
240        
241        for (String eventId : eventIds)
242        {
243            Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId);
244            if (additionalArgs == null)
245            {
246                additionalArgs = new HashMap<>();
247                additionalArgsForEvents.put(eventId, additionalArgs);
248            }
249            
250            additionalArgs.put(argName, argValue);
251        }
252    }
253    
254    /**
255     * For all events of the given event id which will be notified during the current request, removes the additional argument
256     * with the given name which was added with the {@link ObservationManager#addArgumentForEvents(String[], String, Object)} method.
257     * @param eventIds The event ids
258     * @param argName The name of the additional argument to remove
259     */
260    public void removeArgumentForEvents(String[] eventIds, String argName)
261    {
262        Request request = ContextHelper.getRequest(_avalonContext);
263        
264        // Map eventId -> argName -> argValue
265        @SuppressWarnings("unchecked")
266        Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME);
267        if (additionalArgsForEvents == null)
268        {
269            return;
270        }
271        
272        for (String eventId : eventIds)
273        {
274            Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId);
275            if (additionalArgs == null)
276            {
277                break;
278            }
279            
280            additionalArgs.remove(argName);
281        }
282    }
283    
284    private void _putAdditionalArgs(final Event event)
285    {
286        String eventId = event.getId();
287        
288        Request request = null;
289        try
290        {
291            request = ContextHelper.getRequest(_avalonContext);
292        }
293        catch (CascadingRuntimeException e)
294        {
295            // can happen if no request
296            // return empty list
297        }
298        if (request != null)
299        {
300            @SuppressWarnings("unchecked")
301            Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME);
302            if (additionalArgsForEvents != null)
303            {
304                Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId);
305                if (additionalArgs != null)
306                {
307                    event.getArguments().putAll(additionalArgs);
308                }
309            }
310        }
311    }
312    
313    /**
314     * Observes the event
315     * @param event The event
316     * @param supportedObservers list of observers
317     * @param result The result object to fill 
318     * @throws Exception on error
319     */
320    protected void _observesEvent(final Event event, List<Observer> supportedObservers, List<Future> result) throws Exception
321    {
322        List<AsyncObserver> parallelizableAsyncObservers = new ArrayList<>();
323        List<AsyncObserver> nonParallelizableAsyncObservers = new ArrayList<>();
324        
325        Map<String, Object> transientVars = new HashMap<>();
326        for (Observer supportedObserver : supportedObservers)
327        {
328            if (getLogger().isInfoEnabled())
329            {
330                getLogger().info("Notifying observer: " + supportedObserver + " for event: " + event);
331            }
332            
333            // Observe current event
334            if (supportedObserver instanceof AsyncObserver)
335            {
336                AsyncObserver asyncObserver = (AsyncObserver) supportedObserver;
337                boolean parallelizable = asyncObserver.parallelizable();
338                
339                if (getLogger().isDebugEnabled())
340                {
341                    getLogger().debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.",
342                            parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event));
343                }
344                if (__ALL_EVENTS_LOGGER.isDebugEnabled())
345                {
346                    __ALL_EVENTS_LOGGER.debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.",
347                            parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event));
348                }
349                
350                if (parallelizable)
351                {
352                    parallelizableAsyncObservers.add(asyncObserver);
353                }
354                else
355                {
356                    nonParallelizableAsyncObservers.add(asyncObserver);
357                }
358            }
359            else
360            {
361                try
362                {
363                    supportedObserver.observe(event, transientVars);
364                }
365                catch (Throwable e)
366                {
367                    // Observation process should continue
368                    getLogger().error("The synchronous observer '" + supportedObserver.getClass().getName() + "' failed to observe " + event, e);
369                }
370            }
371        }
372        
373        // Observe for async observer.
374        if (!parallelizableAsyncObservers.isEmpty() || !nonParallelizableAsyncObservers.isEmpty())
375        {
376            _asyncObserve(parallelizableAsyncObservers, nonParallelizableAsyncObservers, event, transientVars, result);
377        }
378    }
379
380    /**
381     * Async observe through a thread pool
382     * @param parallelObservers parallelizable asynchronous observers
383     * @param nonParallelObservers non parallelizable asynchronous observers
384     * @param event The event to observe
385     * @param transientVars The observer transient vars
386     * @param result The future results
387     */
388    private void _asyncObserve(Collection<AsyncObserver> parallelObservers, List<AsyncObserver> nonParallelObservers, Event event, Map<String, Object> transientVars, List<Future> result)
389    {
390        if (__SINGLE_THREAD_EXECUTOR == null)
391        {
392            AsyncObserveThreadFactory threadFactory = new AsyncObserveThreadFactory();
393            
394            __SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor(threadFactory);
395            
396            // 1 thread per core
397            __PARALLEL_THREAD_EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory);
398        }
399        
400        if (!parallelObservers.isEmpty())
401        {
402            for (AsyncObserver observer :  parallelObservers)
403            {
404                Future future = __PARALLEL_THREAD_EXECUTOR.submit(new ParallelAsyncObserve(observer, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER));
405                result.add(future);
406            }
407        }
408        
409        if (!nonParallelObservers.isEmpty())
410        {
411            Future future = __SINGLE_THREAD_EXECUTOR.submit(new NonParallelAsyncObserve(nonParallelObservers, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER));
412            result.add(future);
413        }
414    }
415    
416    /**
417     * Runnable to be used for asynchronous calls 
418     */
419    abstract class AbstractAsyncObserve implements Callable<Object>
420    {
421        /** event to observe */
422        protected final Event _event;
423        protected final Map<String, Object> _transientVars;
424        protected final org.apache.avalon.framework.logger.Logger _logger;
425        protected final Logger _allEventLogger;
426        
427        AbstractAsyncObserve(Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
428        {
429            _event = event;
430            _transientVars = transientVars;
431            _logger = logger;
432            _allEventLogger = allEventLogger;
433        }
434        
435        @Override
436        public Object call()
437        {
438            Map<String, Object> environmentInformation = null;
439            Integer computedResult = 0;
440            try
441            {
442                // Create the environment.
443                environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_manager, _context, _logger);
444                
445                _setUserInSession(environmentInformation);
446                
447                _observe();
448            }
449            catch (Exception e)
450            {
451                // Observer must never fail, so just log the error
452                _logger.error("Unable to dispatch event: " + _event + " to asynchronous observers", e);
453                computedResult = -1;
454            }
455            finally
456            {
457                // FIXME close jcr-sessions? (as in JCRSessionDispatchRequestProcess)
458                
459                BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation);
460            }
461            
462            return computedResult;
463        }
464        
465        private boolean _setUserInSession(Map<String, Object> environmentInformation)
466        {
467            UserIdentity userIdentity = Optional.ofNullable(_event.getIssuer())
468                    .orElse(UserPopulationDAO.SYSTEM_USER_IDENTITY);
469            
470            BackgroundEnvironment bgEnv = (BackgroundEnvironment) environmentInformation.get("environment");
471            Request request = ObjectModelHelper.getRequest(bgEnv.getObjectModel());
472
473            AuthenticateAction.setUserIdentityInSession(request, userIdentity, new UserDAO.ImpersonateCredentialProvider(), true);
474            
475            return true;
476        }
477        
478        /**
479         * Abstract observe method where the observation should be done
480         * @throws Exception on error
481         */
482        protected abstract void _observe() throws Exception;
483    }
484    
485    /**
486     * Runnable for parallel observers 
487     */
488    class ParallelAsyncObserve extends AbstractAsyncObserve
489    {
490        private final AsyncObserver _observer;
491        
492        ParallelAsyncObserve(AsyncObserver observer, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
493        {
494            super(event, transientVars, logger, allEventLogger);
495            _observer = observer;
496        }
497        
498        @Override
499        protected void _observe() throws Exception
500        {
501            if (_logger.isDebugEnabled())
502            {
503                _logger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + ".");
504            }
505            if (_allEventLogger.isDebugEnabled())
506            {
507                _allEventLogger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + ".");
508            }
509            
510            _observer.observe(_event, _transientVars);
511        }
512    }
513    
514    /**
515     * Runnable for non parallel observers 
516     */
517    class NonParallelAsyncObserve extends AbstractAsyncObserve
518    {
519        private final Collection<AsyncObserver> _observers;
520        
521        NonParallelAsyncObserve(Collection<AsyncObserver> observers, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
522        {
523            super(event, transientVars, logger, allEventLogger);
524            _observers = observers;
525        }
526        
527        @Override
528        protected void _observe() throws Exception
529        {
530            for (AsyncObserver observer : _observers)
531            {
532                if (_logger.isDebugEnabled())
533                {
534                    _logger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + ".");
535                }
536                if (_allEventLogger.isDebugEnabled())
537                {
538                    _allEventLogger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + ".");
539                }
540                
541                observer.observe(_event, _transientVars);
542            }
543        }
544    }
545    
546    /**
547     * Thread factory for async observers.
548     * Set the thread name format and marks the thread as daemon. 
549     */
550    static class AsyncObserveThreadFactory implements ThreadFactory
551    {
552        private static ThreadFactory _defaultThreadFactory;
553        private static String _nameFormat;
554        private static AtomicLong _count;
555        
556        public AsyncObserveThreadFactory()
557        {
558            _defaultThreadFactory = Executors.defaultThreadFactory();
559            _nameFormat = "ametys-async-observe-%d";
560            _count = new AtomicLong(0);
561        }
562        
563        public Thread newThread(Runnable r)
564        {
565            Thread thread = _defaultThreadFactory.newThread(r);
566            thread.setName(String.format(_nameFormat, _count.getAndIncrement()));
567            thread.setDaemon(true);
568            
569            return thread;
570        }
571    }
572    
573    @Override
574    public void dispose()
575    {
576        if (__SINGLE_THREAD_EXECUTOR != null)
577        {
578            __SINGLE_THREAD_EXECUTOR.shutdownNow();
579            __SINGLE_THREAD_EXECUTOR = null;
580        }
581        if (__PARALLEL_THREAD_EXECUTOR != null)
582        {
583            __PARALLEL_THREAD_EXECUTOR.shutdownNow();
584            __PARALLEL_THREAD_EXECUTOR = null;
585        }
586        
587        _context = null;
588        _manager = null;
589    }
590
591    /**
592     * Registers an {@link Observer}.
593     * @param observer the {@link Observer}.
594     */
595    public void registerObserver(Observer observer)
596    {
597        _registeredObservers.add(observer);
598    }
599    
600    /**
601     * Unregisters an {@link Observer}.
602     * @param observer the {@link Observer}.
603     */
604    public void unregisterObserver(Observer observer)
605    {
606        _registeredObservers.remove(observer);
607    }
608}