001/*
002 *  Copyright 2013 Anyware Services
003 *
004 *  Licensed under the Apache License, Version 2.0 (the "License");
005 *  you may not use this file except in compliance with the License.
006 *  You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 *  Unless required by applicable law or agreed to in writing, software
011 *  distributed under the License is distributed on an "AS IS" BASIS,
012 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 *  See the License for the specific language governing permissions and
014 *  limitations under the License.
015 */
016package org.ametys.core.observation;
017
018import java.util.ArrayList;
019import java.util.Collection;
020import java.util.Collections;
021import java.util.Comparator;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Optional;
027import java.util.Set;
028import java.util.concurrent.Callable;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.Future;
032import java.util.concurrent.ThreadFactory;
033import java.util.concurrent.atomic.AtomicLong;
034
035import org.apache.avalon.framework.CascadingRuntimeException;
036import org.apache.avalon.framework.activity.Disposable;
037import org.apache.avalon.framework.component.Component;
038import org.apache.avalon.framework.context.ContextException;
039import org.apache.avalon.framework.context.Contextualizable;
040import org.apache.avalon.framework.logger.AbstractLogEnabled;
041import org.apache.avalon.framework.service.ServiceException;
042import org.apache.avalon.framework.service.ServiceManager;
043import org.apache.avalon.framework.service.Serviceable;
044import org.apache.cocoon.Constants;
045import org.apache.cocoon.components.ContextHelper;
046import org.apache.cocoon.environment.Context;
047import org.apache.cocoon.environment.ObjectModelHelper;
048import org.apache.cocoon.environment.Request;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052import org.ametys.core.authentication.AuthenticateAction;
053import org.ametys.core.engine.BackgroundEngineHelper;
054import org.ametys.core.engine.BackgroundEnvironment;
055import org.ametys.core.user.UserIdentity;
056import org.ametys.core.user.population.UserPopulationDAO;
057import org.ametys.plugins.core.user.UserDAO;
058
059/**
060 * Manager for dispatching {@link Event} instances to {@link Observer}s.
061 */
062public class ObservationManager extends AbstractLogEnabled implements Component, Serviceable, Contextualizable, Disposable
063{
064    /**
065     * Store a future and the traits of the observer that it will execute
066     * @param future the future
067     * @param traits the traits
068     */
069    public record ObserverFuture(Future future, Set<String> traits) { }
070
071    /** Avalon ROLE. */
072    public static final String ROLE = ObservationManager.class.getName();
073    
074    private static final String __CURRENT_FUTURES_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$currentFutures";
075    private static final String __ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$.additional.args";
076    private static final Logger __ALL_EVENTS_LOGGER = LoggerFactory.getLogger("org.ametys.cms.observation.AllEvents");
077    
078    private static final Comparator<Observer> __OBSERVER_COMPARATOR = Comparator.comparing(Observer::getPriority);
079    
080    /**
081     * The executor service managing the single thread pool. This threads is
082     * used to run non-parallelizable observers.
083     */
084    private static ExecutorService __SINGLE_THREAD_EXECUTOR;
085    
086    /**
087     * The executor service managing the thread pool of asynchronous observers
088     * allowed to run in parallel.
089     */
090    private static ExecutorService __PARALLEL_THREAD_EXECUTOR;
091    
092    /** Cocoon context */
093    protected Context _context;
094    
095    /** Service manager */
096    protected ServiceManager _manager;
097    
098    private org.apache.avalon.framework.context.Context _avalonContext;
099    private ObserverExtensionPoint _observerExtPt;
100    private Collection<Observer> _registeredObservers = new ArrayList<>();
101    
102    @Override
103    public void service(ServiceManager manager) throws ServiceException
104    {
105        _manager = manager;
106        _observerExtPt  = (ObserverExtensionPoint) manager.lookup(ObserverExtensionPoint.ROLE);
107    }
108    
109    @Override
110    public void contextualize(org.apache.avalon.framework.context.Context context) throws ContextException
111    {
112        _avalonContext = context;
113        _context = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT);
114    }
115    
116    /**
117     * Notify of a event which will be dispatch to registered
118     * observers.
119     * @param event the event to notify.
120     * @return The {@link Future} objects of the asynchronous observers
121     */
122    public List<Future> notify(final Event event)
123    {
124        List<Future> result = new ArrayList<>();
125        
126        try
127        {
128            if (getLogger().isDebugEnabled())
129            {
130                getLogger().debug("Receiving " + event);
131            }
132            
133            if (__ALL_EVENTS_LOGGER.isInfoEnabled())
134            {
135                __ALL_EVENTS_LOGGER.info("Receiving " + event);
136            }
137            
138            // Add the comparator to keep the right order with observers from the extension point
139            // and manually registered observers
140            List<Observer> supportedObservers = new ArrayList<>();
141            
142            // Retrieve supported observers from extension point
143            supportedObservers.addAll(_observerExtPt.getSupportingExtensions(event));
144            
145            // Retrieve supported observers from manual registration
146            supportedObservers.addAll(
147                _registeredObservers.stream()
148                    .filter(obs -> obs.supports(event))
149                    .toList()
150            );
151            
152            supportedObservers.sort(__OBSERVER_COMPARATOR);
153            
154            // Observes the event and prepares the asynchronous observes.
155            _putAdditionalArgs(event);
156            _observesEvent(event, supportedObservers, result);
157        }
158        catch (Exception e)
159        {
160            // Observers should never fail, so just log the error
161            getLogger().error("Unable to dispatch event: " + event + " to observers", e);
162        }
163        
164        return result;
165    }
166    
167    /**
168     * Gets all the {@link Future}s of all the {@link AsyncObserver}s notified by the call to {@link ObservationManager#notify(Event)} during the current request.
169     * <br>This can be useful to call {@link Future#get()} in order to ensure all {@link AsyncObserver}s notified by the current request have finished to work.
170     * @return the {@link Future}s of the notified observers during the current request.
171     */
172    public List<ObserverFuture> getFuturesForRequest()
173    {
174        try
175        {
176            return Collections.unmodifiableList(_getFuturesForRequest());
177        }
178        catch (CascadingRuntimeException e)
179        {
180            // can happen if no request
181            // return empty map
182            return List.of();
183        }
184    }
185    
186    private List<ObserverFuture> _getFuturesForRequest() throws CascadingRuntimeException
187    {
188        Request request = ContextHelper.getRequest(_avalonContext);
189        @SuppressWarnings("unchecked")
190        List<ObserverFuture> futuresForCurrentRequest = (List<ObserverFuture>) request.getAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME);
191        if (futuresForCurrentRequest == null)
192        {
193            futuresForCurrentRequest = new ArrayList<>();
194            request.setAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME, futuresForCurrentRequest);
195        }
196        return futuresForCurrentRequest;
197    }
198    
199    /**
200     * For all events of the given event id which will be notified during the current request, add an additional argument
201     * @param eventIds The event ids
202     * @param argName The name of the additional argument
203     * @param argValue The value of the additional argument
204     */
205    public void addArgumentForEvents(String[] eventIds, String argName, Object argValue)
206    {
207        Request request = ContextHelper.getRequest(_avalonContext);
208        
209        // Map eventId -> argName -> argValue
210        @SuppressWarnings("unchecked")
211        Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME);
212        if (additionalArgsForEvents == null)
213        {
214            additionalArgsForEvents = new HashMap<>();
215            request.setAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME, additionalArgsForEvents);
216        }
217        
218        for (String eventId : eventIds)
219        {
220            Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId);
221            if (additionalArgs == null)
222            {
223                additionalArgs = new HashMap<>();
224                additionalArgsForEvents.put(eventId, additionalArgs);
225            }
226            
227            additionalArgs.put(argName, argValue);
228        }
229    }
230    
231    /**
232     * For all events of the given event id which will be notified during the current request, removes the additional argument
233     * with the given name which was added with the {@link ObservationManager#addArgumentForEvents(String[], String, Object)} method.
234     * @param eventIds The event ids
235     * @param argName The name of the additional argument to remove
236     */
237    public void removeArgumentForEvents(String[] eventIds, String argName)
238    {
239        Request request = ContextHelper.getRequest(_avalonContext);
240        
241        // Map eventId -> argName -> argValue
242        @SuppressWarnings("unchecked")
243        Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME);
244        if (additionalArgsForEvents == null)
245        {
246            return;
247        }
248        
249        for (String eventId : eventIds)
250        {
251            Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId);
252            if (additionalArgs == null)
253            {
254                break;
255            }
256            
257            additionalArgs.remove(argName);
258        }
259    }
260    
261    private void _putAdditionalArgs(final Event event)
262    {
263        String eventId = event.getId();
264        
265        Request request = null;
266        try
267        {
268            request = ContextHelper.getRequest(_avalonContext);
269        }
270        catch (CascadingRuntimeException e)
271        {
272            // can happen if no request
273            // return empty list
274        }
275        if (request != null)
276        {
277            @SuppressWarnings("unchecked")
278            Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME);
279            if (additionalArgsForEvents != null)
280            {
281                Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId);
282                if (additionalArgs != null)
283                {
284                    event.getArguments().putAll(additionalArgs);
285                }
286            }
287        }
288    }
289    
290    /**
291     * Observes the event
292     * @param event The event
293     * @param supportedObservers list of observers
294     * @param result The result object to fill
295     * @throws Exception on error
296     */
297    protected void _observesEvent(final Event event, List<Observer> supportedObservers, List<Future> result) throws Exception
298    {
299        List<AsyncObserver> parallelizableAsyncObservers = new ArrayList<>();
300        List<AsyncObserver> nonParallelizableAsyncObservers = new ArrayList<>();
301        
302        Map<String, Object> transientVars = new HashMap<>();
303        for (Observer supportedObserver : supportedObservers)
304        {
305            if (getLogger().isInfoEnabled())
306            {
307                getLogger().info("Notifying observer: " + supportedObserver + " for event: " + event);
308            }
309            
310            // Observe current event
311            if (supportedObserver instanceof AsyncObserver)
312            {
313                AsyncObserver asyncObserver = (AsyncObserver) supportedObserver;
314                boolean parallelizable = asyncObserver.parallelizable();
315                
316                if (getLogger().isDebugEnabled())
317                {
318                    getLogger().debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.",
319                            parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event));
320                }
321                if (__ALL_EVENTS_LOGGER.isDebugEnabled())
322                {
323                    __ALL_EVENTS_LOGGER.debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.",
324                            parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event));
325                }
326                
327                if (parallelizable)
328                {
329                    parallelizableAsyncObservers.add(asyncObserver);
330                }
331                else
332                {
333                    nonParallelizableAsyncObservers.add(asyncObserver);
334                }
335            }
336            else
337            {
338                try
339                {
340                    supportedObserver.observe(event, transientVars);
341                }
342                catch (Throwable e)
343                {
344                    // Observation process should continue
345                    getLogger().error("The synchronous observer '" + supportedObserver.getClass().getName() + "' failed to observe " + event, e);
346                }
347            }
348        }
349        
350        // Observe for async observer.
351        if (!parallelizableAsyncObservers.isEmpty() || !nonParallelizableAsyncObservers.isEmpty())
352        {
353            _asyncObserve(parallelizableAsyncObservers, nonParallelizableAsyncObservers, event, transientVars, result);
354        }
355    }
356
357    /**
358     * Async observe through a thread pool
359     * @param parallelObservers parallelizable asynchronous observers
360     * @param nonParallelObservers non parallelizable asynchronous observers
361     * @param event The event to observe
362     * @param transientVars The observer transient vars
363     * @param result The future results
364     */
365    private void _asyncObserve(Collection<AsyncObserver> parallelObservers, List<AsyncObserver> nonParallelObservers, Event event, Map<String, Object> transientVars, List<Future> result)
366    {
367        // Keep in current request the futures of the curent request
368        List<ObserverFuture> futuresForCurrentRequest = null;
369        try
370        {
371            futuresForCurrentRequest = _getFuturesForRequest();
372        }
373        catch (CascadingRuntimeException e)
374        {
375            // can happen if no request
376            // do nothing though
377        }
378
379        if (__SINGLE_THREAD_EXECUTOR == null)
380        {
381            AsyncObserveThreadFactory threadFactory = new AsyncObserveThreadFactory();
382            
383            __SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor(threadFactory);
384            
385            // 1 thread per core
386            __PARALLEL_THREAD_EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory);
387        }
388        
389        if (!parallelObservers.isEmpty())
390        {
391            for (AsyncObserver observer :  parallelObservers)
392            {
393                Future future = __PARALLEL_THREAD_EXECUTOR.submit(new ParallelAsyncObserve(observer, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER));
394                result.add(future);
395                if (futuresForCurrentRequest != null)
396                {
397                    futuresForCurrentRequest.add(new ObserverFuture(future, observer.getTraits()));
398                }
399            }
400        }
401        
402        if (!nonParallelObservers.isEmpty())
403        {
404            Future future = __SINGLE_THREAD_EXECUTOR.submit(new NonParallelAsyncObserve(nonParallelObservers, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER));
405            result.add(future);
406            if (futuresForCurrentRequest != null)
407            {
408                Set<String> traits = new HashSet<>();
409                for (Observer observer : nonParallelObservers)
410                {
411                    traits.addAll(observer.getTraits());
412                }
413                futuresForCurrentRequest.add(new ObserverFuture(future, traits));
414            }
415        }
416    }
417    
418    /**
419     * Runnable to be used for asynchronous calls
420     */
421    abstract class AbstractAsyncObserve implements Callable<Object>
422    {
423        /** event to observe */
424        protected final Event _event;
425        protected final Map<String, Object> _transientVars;
426        protected final org.apache.avalon.framework.logger.Logger _logger;
427        protected final Logger _allEventLogger;
428        
429        AbstractAsyncObserve(Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
430        {
431            _event = event;
432            _transientVars = transientVars;
433            _logger = logger;
434            _allEventLogger = allEventLogger;
435        }
436        
437        @Override
438        public Object call()
439        {
440            Map<String, Object> environmentInformation = null;
441            Integer computedResult = 0;
442            try
443            {
444                // Create the environment.
445                environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_manager, _context, _logger);
446                
447                _setUserInSession(environmentInformation);
448                
449                _observe();
450            }
451            catch (Exception e)
452            {
453                // Observer must never fail, so just log the error
454                _logger.error("Unable to dispatch event: " + _event + " to asynchronous observers", e);
455                computedResult = -1;
456            }
457            finally
458            {
459                // FIXME close jcr-sessions? (as in JCRSessionDispatchRequestProcess)
460                
461                BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation);
462            }
463            
464            return computedResult;
465        }
466        
467        private boolean _setUserInSession(Map<String, Object> environmentInformation)
468        {
469            UserIdentity userIdentity = Optional.ofNullable(_event.getIssuer())
470                    .orElse(UserPopulationDAO.SYSTEM_USER_IDENTITY);
471            
472            BackgroundEnvironment bgEnv = (BackgroundEnvironment) environmentInformation.get("environment");
473            Request request = ObjectModelHelper.getRequest(bgEnv.getObjectModel());
474
475            AuthenticateAction.setUserIdentityInSession(request, userIdentity, new UserDAO.ImpersonateCredentialProvider(), true);
476            
477            return true;
478        }
479        
480        /**
481         * Abstract observe method where the observation should be done
482         * @throws Exception on error
483         */
484        protected abstract void _observe() throws Exception;
485    }
486    
487    /**
488     * Runnable for parallel observers
489     */
490    class ParallelAsyncObserve extends AbstractAsyncObserve
491    {
492        private final AsyncObserver _observer;
493        
494        ParallelAsyncObserve(AsyncObserver observer, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
495        {
496            super(event, transientVars, logger, allEventLogger);
497            _observer = observer;
498        }
499        
500        @Override
501        protected void _observe() throws Exception
502        {
503            if (_logger.isDebugEnabled())
504            {
505                _logger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + ".");
506            }
507            if (_allEventLogger.isDebugEnabled())
508            {
509                _allEventLogger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + ".");
510            }
511            
512            _observer.observe(_event, _transientVars);
513        }
514    }
515    
516    /**
517     * Runnable for non parallel observers
518     */
519    class NonParallelAsyncObserve extends AbstractAsyncObserve
520    {
521        private final Collection<AsyncObserver> _observers;
522        
523        NonParallelAsyncObserve(Collection<AsyncObserver> observers, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
524        {
525            super(event, transientVars, logger, allEventLogger);
526            _observers = observers;
527        }
528        
529        @Override
530        protected void _observe() throws Exception
531        {
532            for (AsyncObserver observer : _observers)
533            {
534                if (_logger.isDebugEnabled())
535                {
536                    _logger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + ".");
537                }
538                if (_allEventLogger.isDebugEnabled())
539                {
540                    _allEventLogger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + ".");
541                }
542                
543                observer.observe(_event, _transientVars);
544            }
545        }
546    }
547    
548    /**
549     * Thread factory for async observers.
550     * Set the thread name format and marks the thread as daemon.
551     */
552    static class AsyncObserveThreadFactory implements ThreadFactory
553    {
554        private ThreadFactory _defaultThreadFactory;
555        private String _nameFormat;
556        private AtomicLong _count;
557        
558        public AsyncObserveThreadFactory()
559        {
560            _defaultThreadFactory = Executors.defaultThreadFactory();
561            _nameFormat = "ametys-async-observe-%d";
562            _count = new AtomicLong(0);
563        }
564        
565        public Thread newThread(Runnable r)
566        {
567            Thread thread = _defaultThreadFactory.newThread(r);
568            thread.setName(String.format(_nameFormat, _count.getAndIncrement()));
569            thread.setDaemon(true);
570            
571            return thread;
572        }
573    }
574    
575    @Override
576    public void dispose()
577    {
578        if (__SINGLE_THREAD_EXECUTOR != null)
579        {
580            __SINGLE_THREAD_EXECUTOR.shutdownNow();
581            __SINGLE_THREAD_EXECUTOR = null;
582        }
583        if (__PARALLEL_THREAD_EXECUTOR != null)
584        {
585            __PARALLEL_THREAD_EXECUTOR.shutdownNow();
586            __PARALLEL_THREAD_EXECUTOR = null;
587        }
588        
589        _context = null;
590        _manager = null;
591    }
592
593    /**
594     * Registers an {@link Observer}.
595     * @param observer the {@link Observer}.
596     */
597    public void registerObserver(Observer observer)
598    {
599        _registeredObservers.add(observer);
600    }
601    
602    /**
603     * Unregisters an {@link Observer}.
604     * @param observer the {@link Observer}.
605     */
606    public void unregisterObserver(Observer observer)
607    {
608        _registeredObservers.remove(observer);
609    }
610}