001/*
002 *  Copyright 2013 Anyware Services
003 *
004 *  Licensed under the Apache License, Version 2.0 (the "License");
005 *  you may not use this file except in compliance with the License.
006 *  You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 *  Unless required by applicable law or agreed to in writing, software
011 *  distributed under the License is distributed on an "AS IS" BASIS,
012 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 *  See the License for the specific language governing permissions and
014 *  limitations under the License.
015 */
016package org.ametys.core.observation;
017
018import java.util.ArrayList;
019import java.util.Collection;
020import java.util.Collections;
021import java.util.Comparator;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Optional;
027import java.util.Set;
028import java.util.concurrent.Callable;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.Future;
032import java.util.concurrent.ThreadFactory;
033import java.util.concurrent.atomic.AtomicLong;
034
035import org.apache.avalon.framework.CascadingRuntimeException;
036import org.apache.avalon.framework.activity.Disposable;
037import org.apache.avalon.framework.component.Component;
038import org.apache.avalon.framework.context.ContextException;
039import org.apache.avalon.framework.context.Contextualizable;
040import org.apache.avalon.framework.logger.AbstractLogEnabled;
041import org.apache.avalon.framework.service.ServiceException;
042import org.apache.avalon.framework.service.ServiceManager;
043import org.apache.avalon.framework.service.Serviceable;
044import org.apache.cocoon.Constants;
045import org.apache.cocoon.components.ContextHelper;
046import org.apache.cocoon.environment.Context;
047import org.apache.cocoon.environment.ObjectModelHelper;
048import org.apache.cocoon.environment.Request;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.ametys.core.authentication.AuthenticateAction;
053import org.ametys.core.engine.BackgroundEngineHelper;
054import org.ametys.core.engine.BackgroundEnvironment;
055import org.ametys.core.user.UserIdentity;
056import org.ametys.core.user.population.UserPopulationDAO;
057import org.ametys.plugins.core.user.UserDAO;
058
059/**
060 * Manager for dispatching {@link Event} instances to {@link Observer}s.
061 */
062public class ObservationManager extends AbstractLogEnabled implements Component, Serviceable, Contextualizable, Disposable
063{
064    /**
065     * Store a future and the traits of the observer that it will execute
066     * @param future the future
067     * @param traits the traits
068     */
069    public record ObserverFuture(Future future, Set<String> traits) { }
070
071    /** Avalon ROLE. */
072    public static final String ROLE = ObservationManager.class.getName();
073    
074    private static final String __CURRENT_FUTURES_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$currentFutures";
075    private static final String __ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$.additional.args";
076    private static final Logger __ALL_EVENTS_LOGGER = LoggerFactory.getLogger("org.ametys.cms.observation.AllEvents");
077    
078    /**
079     * The executor service managing the single thread pool. This threads is
080     * used to run non-parallelizable observers.
081     */
082    private static ExecutorService __SINGLE_THREAD_EXECUTOR;
083    
084    /**
085     * The executor service managing the thread pool of asynchronous observers
086     * allowed to run in parallel.
087     */
088    private static ExecutorService __PARALLEL_THREAD_EXECUTOR;
089    
090    /** Cocoon context */
091    protected Context _context;
092    
093    /** Service manager */
094    protected ServiceManager _manager;
095    
096    private org.apache.avalon.framework.context.Context _avalonContext;
097    private ObserverExtensionPoint _observerExtPt;
098    private Collection<Observer> _registeredObservers = new ArrayList<>();
099    
100    @Override
101    public void service(ServiceManager manager) throws ServiceException
102    {
103        _manager = manager;
104        _observerExtPt  = (ObserverExtensionPoint) manager.lookup(ObserverExtensionPoint.ROLE);
105    }
106    
107    @Override
108    public void contextualize(org.apache.avalon.framework.context.Context context) throws ContextException
109    {
110        _avalonContext = context;
111        _context = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT);
112    }
113    
114    /**
115     * Notify of a event which will be dispatch to registered
116     * observers.
117     * @param event the event to notify.
118     * @return The {@link Future} objects of the asynchronous observers
119     */
120    public List<Future> notify(final Event event)
121    {
122        List<Future> result = new ArrayList<>();
123        
124        try
125        {
126            if (getLogger().isDebugEnabled())
127            {
128                getLogger().debug("Receiving " + event);
129            }
130            
131            if (__ALL_EVENTS_LOGGER.isInfoEnabled())
132            {
133                __ALL_EVENTS_LOGGER.info("Receiving " + event);
134            }
135            
136            List<Observer> supportedObservers = new ArrayList<>();
137            
138            // Retrieve supported observers from extension point
139            for (String observerId : _observerExtPt.getExtensionsIds())
140            {
141                Observer observer = _observerExtPt.getExtension(observerId);
142                _addSupportedObserver(supportedObservers, observer, event);
143            }
144            
145            // Retrieve supported observers from manual registration
146            for (Observer observer : _registeredObservers)
147            {
148                _addSupportedObserver(supportedObservers, observer, event);
149            }
150            
151            // Order observers (0 is first, Integer.MAX_INT is last)
152            Collections.sort(supportedObservers, Comparator.comparingInt(
153                observer -> observer.getPriority(event)
154            ));
155            
156            // Observes the event and prepares the asynchronous observes.
157            _putAdditionalArgs(event);
158            _observesEvent(event, supportedObservers, result);
159        }
160        catch (Exception e)
161        {
162            // Observers should never fail, so just log the error
163            getLogger().error("Unable to dispatch event: " + event + " to observers", e);
164        }
165        
166        return result;
167    }
168    
169    private void _addSupportedObserver(List<Observer> supportedObservers, Observer observer, final Event event)
170    {
171        if (getLogger().isDebugEnabled())
172        {
173            getLogger().debug("Checking support for event: " + event + " and observer: " + observer);
174        }
175        
176        if (observer.supports(event))
177        {
178            if (getLogger().isDebugEnabled())
179            {
180                getLogger().debug("Event: " + event + " supported for observer: " + observer);
181            }
182            
183            supportedObservers.add(observer);
184        }
185    }
186    
187    /**
188     * Gets all the {@link Future}s of all the {@link AsyncObserver}s notified by the call to {@link ObservationManager#notify(Event)} during the current request.
189     * <br>This can be useful to call {@link Future#get()} in order to ensure all {@link AsyncObserver}s notified by the current request have finished to work.
190     * @return the {@link Future}s of the notified observers during the current request.
191     */
192    public List<ObserverFuture> getFuturesForRequest()
193    {
194        try
195        {
196            return Collections.unmodifiableList(_getFuturesForRequest());
197        }
198        catch (CascadingRuntimeException e)
199        {
200            // can happen if no request
201            // return empty map
202            return List.of();
203        }
204    }
205    
206    private List<ObserverFuture> _getFuturesForRequest() throws CascadingRuntimeException
207    {
208        Request request = ContextHelper.getRequest(_avalonContext);
209        @SuppressWarnings("unchecked")
210        List<ObserverFuture> futuresForCurrentRequest = (List<ObserverFuture>) request.getAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME);
211        if (futuresForCurrentRequest == null)
212        {
213            futuresForCurrentRequest = new ArrayList<>();
214            request.setAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME, futuresForCurrentRequest);
215        }
216        return futuresForCurrentRequest;
217    }
218    
219    /**
220     * For all events of the given event id which will be notified during the current request, add an additional argument
221     * @param eventIds The event ids
222     * @param argName The name of the additional argument
223     * @param argValue The value of the additional argument
224     */
225    public void addArgumentForEvents(String[] eventIds, String argName, Object argValue)
226    {
227        Request request = ContextHelper.getRequest(_avalonContext);
228        
229        // Map eventId -> argName -> argValue
230        @SuppressWarnings("unchecked")
231        Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME);
232        if (additionalArgsForEvents == null)
233        {
234            additionalArgsForEvents = new HashMap<>();
235            request.setAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME, additionalArgsForEvents);
236        }
237        
238        for (String eventId : eventIds)
239        {
240            Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId);
241            if (additionalArgs == null)
242            {
243                additionalArgs = new HashMap<>();
244                additionalArgsForEvents.put(eventId, additionalArgs);
245            }
246            
247            additionalArgs.put(argName, argValue);
248        }
249    }
250    
251    /**
252     * For all events of the given event id which will be notified during the current request, removes the additional argument
253     * with the given name which was added with the {@link ObservationManager#addArgumentForEvents(String[], String, Object)} method.
254     * @param eventIds The event ids
255     * @param argName The name of the additional argument to remove
256     */
257    public void removeArgumentForEvents(String[] eventIds, String argName)
258    {
259        Request request = ContextHelper.getRequest(_avalonContext);
260        
261        // Map eventId -> argName -> argValue
262        @SuppressWarnings("unchecked")
263        Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME);
264        if (additionalArgsForEvents == null)
265        {
266            return;
267        }
268        
269        for (String eventId : eventIds)
270        {
271            Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId);
272            if (additionalArgs == null)
273            {
274                break;
275            }
276            
277            additionalArgs.remove(argName);
278        }
279    }
280    
281    private void _putAdditionalArgs(final Event event)
282    {
283        String eventId = event.getId();
284        
285        Request request = null;
286        try
287        {
288            request = ContextHelper.getRequest(_avalonContext);
289        }
290        catch (CascadingRuntimeException e)
291        {
292            // can happen if no request
293            // return empty list
294        }
295        if (request != null)
296        {
297            @SuppressWarnings("unchecked")
298            Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME);
299            if (additionalArgsForEvents != null)
300            {
301                Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId);
302                if (additionalArgs != null)
303                {
304                    event.getArguments().putAll(additionalArgs);
305                }
306            }
307        }
308    }
309    
310    /**
311     * Observes the event
312     * @param event The event
313     * @param supportedObservers list of observers
314     * @param result The result object to fill 
315     * @throws Exception on error
316     */
317    protected void _observesEvent(final Event event, List<Observer> supportedObservers, List<Future> result) throws Exception
318    {
319        List<AsyncObserver> parallelizableAsyncObservers = new ArrayList<>();
320        List<AsyncObserver> nonParallelizableAsyncObservers = new ArrayList<>();
321        
322        Map<String, Object> transientVars = new HashMap<>();
323        for (Observer supportedObserver : supportedObservers)
324        {
325            if (getLogger().isInfoEnabled())
326            {
327                getLogger().info("Notifying observer: " + supportedObserver + " for event: " + event);
328            }
329            
330            // Observe current event
331            if (supportedObserver instanceof AsyncObserver)
332            {
333                AsyncObserver asyncObserver = (AsyncObserver) supportedObserver;
334                boolean parallelizable = asyncObserver.parallelizable();
335                
336                if (getLogger().isDebugEnabled())
337                {
338                    getLogger().debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.",
339                            parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event));
340                }
341                if (__ALL_EVENTS_LOGGER.isDebugEnabled())
342                {
343                    __ALL_EVENTS_LOGGER.debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.",
344                            parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event));
345                }
346                
347                if (parallelizable)
348                {
349                    parallelizableAsyncObservers.add(asyncObserver);
350                }
351                else
352                {
353                    nonParallelizableAsyncObservers.add(asyncObserver);
354                }
355            }
356            else
357            {
358                try
359                {
360                    supportedObserver.observe(event, transientVars);
361                }
362                catch (Throwable e)
363                {
364                    // Observation process should continue
365                    getLogger().error("The synchronous observer '" + supportedObserver.getClass().getName() + "' failed to observe " + event, e);
366                }
367            }
368        }
369        
370        // Observe for async observer.
371        if (!parallelizableAsyncObservers.isEmpty() || !nonParallelizableAsyncObservers.isEmpty())
372        {
373            _asyncObserve(parallelizableAsyncObservers, nonParallelizableAsyncObservers, event, transientVars, result);
374        }
375    }
376
377    /**
378     * Async observe through a thread pool
379     * @param parallelObservers parallelizable asynchronous observers
380     * @param nonParallelObservers non parallelizable asynchronous observers
381     * @param event The event to observe
382     * @param transientVars The observer transient vars
383     * @param result The future results
384     */
385    private void _asyncObserve(Collection<AsyncObserver> parallelObservers, List<AsyncObserver> nonParallelObservers, Event event, Map<String, Object> transientVars, List<Future> result)
386    {
387        // Keep in current request the futures of the curent request
388        List<ObserverFuture> futuresForCurrentRequest = null;
389        try
390        {
391            futuresForCurrentRequest = _getFuturesForRequest();
392        }
393        catch (CascadingRuntimeException e)
394        {
395            // can happen if no request
396            // do nothing though
397        }
398
399        if (__SINGLE_THREAD_EXECUTOR == null)
400        {
401            AsyncObserveThreadFactory threadFactory = new AsyncObserveThreadFactory();
402            
403            __SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor(threadFactory);
404            
405            // 1 thread per core
406            __PARALLEL_THREAD_EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory);
407        }
408        
409        if (!parallelObservers.isEmpty())
410        {
411            for (AsyncObserver observer :  parallelObservers)
412            {
413                Future future = __PARALLEL_THREAD_EXECUTOR.submit(new ParallelAsyncObserve(observer, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER));
414                result.add(future);
415                if (futuresForCurrentRequest != null)
416                {
417                    futuresForCurrentRequest.add(new ObserverFuture(future, observer.getTraits()));
418                }
419            }
420        }
421        
422        if (!nonParallelObservers.isEmpty())
423        {
424            Future future = __SINGLE_THREAD_EXECUTOR.submit(new NonParallelAsyncObserve(nonParallelObservers, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER));
425            result.add(future);
426            if (futuresForCurrentRequest != null)
427            {
428                Set<String> traits = new HashSet<>();
429                for (Observer observer : nonParallelObservers)
430                {
431                    traits.addAll(observer.getTraits());
432                }
433                futuresForCurrentRequest.add(new ObserverFuture(future, traits));
434            }
435        }
436    }
437    
438    /**
439     * Runnable to be used for asynchronous calls 
440     */
441    abstract class AbstractAsyncObserve implements Callable<Object>
442    {
443        /** event to observe */
444        protected final Event _event;
445        protected final Map<String, Object> _transientVars;
446        protected final org.apache.avalon.framework.logger.Logger _logger;
447        protected final Logger _allEventLogger;
448        
449        AbstractAsyncObserve(Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
450        {
451            _event = event;
452            _transientVars = transientVars;
453            _logger = logger;
454            _allEventLogger = allEventLogger;
455        }
456        
457        @Override
458        public Object call()
459        {
460            Map<String, Object> environmentInformation = null;
461            Integer computedResult = 0;
462            try
463            {
464                // Create the environment.
465                environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_manager, _context, _logger);
466                
467                _setUserInSession(environmentInformation);
468                
469                _observe();
470            }
471            catch (Exception e)
472            {
473                // Observer must never fail, so just log the error
474                _logger.error("Unable to dispatch event: " + _event + " to asynchronous observers", e);
475                computedResult = -1;
476            }
477            finally
478            {
479                // FIXME close jcr-sessions? (as in JCRSessionDispatchRequestProcess)
480                
481                BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation);
482            }
483            
484            return computedResult;
485        }
486        
487        private boolean _setUserInSession(Map<String, Object> environmentInformation)
488        {
489            UserIdentity userIdentity = Optional.ofNullable(_event.getIssuer())
490                    .orElse(UserPopulationDAO.SYSTEM_USER_IDENTITY);
491            
492            BackgroundEnvironment bgEnv = (BackgroundEnvironment) environmentInformation.get("environment");
493            Request request = ObjectModelHelper.getRequest(bgEnv.getObjectModel());
494
495            AuthenticateAction.setUserIdentityInSession(request, userIdentity, new UserDAO.ImpersonateCredentialProvider(), true);
496            
497            return true;
498        }
499        
500        /**
501         * Abstract observe method where the observation should be done
502         * @throws Exception on error
503         */
504        protected abstract void _observe() throws Exception;
505    }
506    
507    /**
508     * Runnable for parallel observers 
509     */
510    class ParallelAsyncObserve extends AbstractAsyncObserve
511    {
512        private final AsyncObserver _observer;
513        
514        ParallelAsyncObserve(AsyncObserver observer, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
515        {
516            super(event, transientVars, logger, allEventLogger);
517            _observer = observer;
518        }
519        
520        @Override
521        protected void _observe() throws Exception
522        {
523            if (_logger.isDebugEnabled())
524            {
525                _logger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + ".");
526            }
527            if (_allEventLogger.isDebugEnabled())
528            {
529                _allEventLogger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + ".");
530            }
531            
532            _observer.observe(_event, _transientVars);
533        }
534    }
535    
536    /**
537     * Runnable for non parallel observers 
538     */
539    class NonParallelAsyncObserve extends AbstractAsyncObserve
540    {
541        private final Collection<AsyncObserver> _observers;
542        
543        NonParallelAsyncObserve(Collection<AsyncObserver> observers, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
544        {
545            super(event, transientVars, logger, allEventLogger);
546            _observers = observers;
547        }
548        
549        @Override
550        protected void _observe() throws Exception
551        {
552            for (AsyncObserver observer : _observers)
553            {
554                if (_logger.isDebugEnabled())
555                {
556                    _logger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + ".");
557                }
558                if (_allEventLogger.isDebugEnabled())
559                {
560                    _allEventLogger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + ".");
561                }
562                
563                observer.observe(_event, _transientVars);
564            }
565        }
566    }
567    
568    /**
569     * Thread factory for async observers.
570     * Set the thread name format and marks the thread as daemon. 
571     */
572    static class AsyncObserveThreadFactory implements ThreadFactory
573    {
574        private static ThreadFactory _defaultThreadFactory;
575        private static String _nameFormat;
576        private static AtomicLong _count;
577        
578        public AsyncObserveThreadFactory()
579        {
580            _defaultThreadFactory = Executors.defaultThreadFactory();
581            _nameFormat = "ametys-async-observe-%d";
582            _count = new AtomicLong(0);
583        }
584        
585        public Thread newThread(Runnable r)
586        {
587            Thread thread = _defaultThreadFactory.newThread(r);
588            thread.setName(String.format(_nameFormat, _count.getAndIncrement()));
589            thread.setDaemon(true);
590            
591            return thread;
592        }
593    }
594    
595    @Override
596    public void dispose()
597    {
598        if (__SINGLE_THREAD_EXECUTOR != null)
599        {
600            __SINGLE_THREAD_EXECUTOR.shutdownNow();
601            __SINGLE_THREAD_EXECUTOR = null;
602        }
603        if (__PARALLEL_THREAD_EXECUTOR != null)
604        {
605            __PARALLEL_THREAD_EXECUTOR.shutdownNow();
606            __PARALLEL_THREAD_EXECUTOR = null;
607        }
608        
609        _context = null;
610        _manager = null;
611    }
612
613    /**
614     * Registers an {@link Observer}.
615     * @param observer the {@link Observer}.
616     */
617    public void registerObserver(Observer observer)
618    {
619        _registeredObservers.add(observer);
620    }
621    
622    /**
623     * Unregisters an {@link Observer}.
624     * @param observer the {@link Observer}.
625     */
626    public void unregisterObserver(Observer observer)
627    {
628        _registeredObservers.remove(observer);
629    }
630}