001/*
002 *  Copyright 2013 Anyware Services
003 *
004 *  Licensed under the Apache License, Version 2.0 (the "License");
005 *  you may not use this file except in compliance with the License.
006 *  You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 *  Unless required by applicable law or agreed to in writing, software
011 *  distributed under the License is distributed on an "AS IS" BASIS,
012 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 *  See the License for the specific language governing permissions and
014 *  limitations under the License.
015 */
016package org.ametys.core.observation;
017
018import java.util.ArrayList;
019import java.util.Collection;
020import java.util.Collections;
021import java.util.Comparator;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.Future;
029import java.util.concurrent.LinkedBlockingQueue;
030import java.util.concurrent.ThreadFactory;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicLong;
034
035import org.apache.avalon.framework.CascadingRuntimeException;
036import org.apache.avalon.framework.activity.Disposable;
037import org.apache.avalon.framework.component.Component;
038import org.apache.avalon.framework.context.ContextException;
039import org.apache.avalon.framework.context.Contextualizable;
040import org.apache.avalon.framework.logger.AbstractLogEnabled;
041import org.apache.avalon.framework.service.ServiceException;
042import org.apache.avalon.framework.service.ServiceManager;
043import org.apache.avalon.framework.service.Serviceable;
044import org.apache.cocoon.Constants;
045import org.apache.cocoon.components.ContextHelper;
046import org.apache.cocoon.environment.Context;
047import org.apache.cocoon.environment.Request;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.ametys.core.engine.BackgroundEngineHelper;
052
053/**
054 * Manager for dispatching {@link Event} instances to {@link Observer}s.
055 */
056public class ObservationManager extends AbstractLogEnabled implements Component, Serviceable, Contextualizable, Disposable
057{
058    /** Avalon ROLE. */
059    public static final String ROLE = ObservationManager.class.getName();
060    
061    private static final String __CURRENT_FUTURES_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$currentFutures";
062    private static final String __ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$.additional.args";
063    private static final Logger __ALL_EVENTS_LOGGER = LoggerFactory.getLogger("org.ametys.cms.observation.AllEvents");
064    
065    /**
066     * The executor service managing the single thread pool. This threads is
067     * used to run non-parallelizable observers.
068     */
069    private static ExecutorService __SINGLE_THREAD_EXECUTOR;
070    
071    /**
072     * The executor service managing the thread pool of asynchronous observers
073     * allowed to run in parallel.
074     */
075    private static ExecutorService __PARALLEL_THREAD_EXECUTOR;
076    
077    /** Cocoon context */
078    protected Context _context;
079    
080    /** Service manager */
081    protected ServiceManager _manager;
082    
083    private org.apache.avalon.framework.context.Context _avalonContext;
084    private ObserverExtensionPoint _observerExtPt;
085    private Collection<Observer> _registeredObservers = new ArrayList<>();
086    
087    @Override
088    public void service(ServiceManager manager) throws ServiceException
089    {
090        _manager = manager;
091        _observerExtPt  = (ObserverExtensionPoint) manager.lookup(ObserverExtensionPoint.ROLE);
092    }
093    
094    @Override
095    public void contextualize(org.apache.avalon.framework.context.Context context) throws ContextException
096    {
097        _avalonContext = context;
098        _context = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT);
099    }
100    
101    /**
102     * Notify of a event which will be dispatch to registered
103     * observers.
104     * @param event the event to notify.
105     * @return The {@link Future} objects of the asynchronous observers
106     */
107    public List<Future> notify(final Event event)
108    {
109        List<Future> result = new ArrayList<>();
110        
111        try
112        {
113            if (getLogger().isDebugEnabled())
114            {
115                getLogger().debug("Receiving " + event);
116            }
117            
118            if (__ALL_EVENTS_LOGGER.isInfoEnabled())
119            {
120                __ALL_EVENTS_LOGGER.info("Receiving " + event);
121            }
122            
123            List<Observer> supportedObservers = new ArrayList<>();
124            
125            // Retrieve supported observers from extension point
126            for (String observerId : _observerExtPt.getExtensionsIds())
127            {
128                Observer observer = _observerExtPt.getExtension(observerId);
129                _addSupportedObserver(supportedObservers, observer, event);
130            }
131            
132            // Retrieve supported observers from manual registration
133            for (Observer observer : _registeredObservers)
134            {
135                _addSupportedObserver(supportedObservers, observer, event);
136            }
137            
138            // Order observers (0 is first, Integer.MAX_INT is last)
139            Collections.sort(supportedObservers, Comparator.comparingInt(
140                observer -> observer.getPriority(event)
141            ));
142            
143            // Observes the event and prepares the asynchronous observes.
144            _putAdditionalArgs(event);
145            _observesEvent(event, supportedObservers, result);
146        }
147        catch (Exception e)
148        {
149            // Observers should never fail, so just log the error
150            getLogger().error("Unable to dispatch event: " + event + " to observers", e);
151        }
152        
153        // Keep in current request the futures of the curent request
154        try
155        {
156            List<Future> futuresForCurrentRequest = _getFuturesForRequest();
157            futuresForCurrentRequest.addAll(result);
158        }
159        catch (CascadingRuntimeException e)
160        {
161            // can happen if no request
162            // do nothing though
163        }
164        
165        return result;
166    }
167    
168    private void _addSupportedObserver(List<Observer> supportedObservers, Observer observer, final Event event)
169    {
170        if (getLogger().isDebugEnabled())
171        {
172            getLogger().debug("Checking support for event: " + event + " and observer: " + observer);
173        }
174        
175        if (observer.supports(event))
176        {
177            if (getLogger().isDebugEnabled())
178            {
179                getLogger().debug("Event: " + event + " supported for observer: " + observer);
180            }
181            
182            supportedObservers.add(observer);
183        }
184    }
185    
186    /**
187     * Gets all the {@link Future}s of all the {@link AsyncObserver}s notified by the call to {@link ObservationManager#notify(Event)} during the current request.
188     * <br>This can be useful to call {@link Future#get()} in order to ensure all {@link AsyncObserver}s notified by the current request have finished to work.
189     * @return the {@link Future}s of the notified observers during the current request.
190     */
191    public List<Future> getFuturesForRequest()
192    {
193        try
194        {
195            return Collections.unmodifiableList(_getFuturesForRequest());
196        }
197        catch (CascadingRuntimeException e)
198        {
199            // can happen if no request
200            // return empty list
201            return Collections.emptyList();
202        }
203    }
204    
205    private List<Future> _getFuturesForRequest() throws CascadingRuntimeException
206    {
207        Request request = ContextHelper.getRequest(_avalonContext);
208        @SuppressWarnings("unchecked")
209        List<Future> futuresForCurrentRequest = (List<Future>) request.getAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME);
210        if (futuresForCurrentRequest == null)
211        {
212            futuresForCurrentRequest = new ArrayList<>();
213            request.setAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME, futuresForCurrentRequest);
214        }
215        return futuresForCurrentRequest;
216    }
217    
218    /**
219     * For all events of the given event id which will be notified during the current request, add an additional argument
220     * @param eventIds The event ids
221     * @param argName The name of the additional argument
222     * @param argValue The value of the additional argument
223     */
224    public void addArgumentForEvents(String[] eventIds, String argName, Object argValue)
225    {
226        Request request = ContextHelper.getRequest(_avalonContext);
227        
228        // Map eventId -> argName -> argValue
229        @SuppressWarnings("unchecked")
230        Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME);
231        if (additionalArgsForEvents == null)
232        {
233            additionalArgsForEvents = new HashMap<>();
234            request.setAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME, additionalArgsForEvents);
235        }
236        
237        for (String eventId : eventIds)
238        {
239            Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId);
240            if (additionalArgs == null)
241            {
242                additionalArgs = new HashMap<>();
243                additionalArgsForEvents.put(eventId, additionalArgs);
244            }
245            
246            additionalArgs.put(argName, argValue);
247        }
248    }
249    
250    /**
251     * For all events of the given event id which will be notified during the current request, removes the additional argument
252     * with the given name which was added with the {@link ObservationManager#addArgumentForEvents(String[], String, Object)} method.
253     * @param eventIds The event ids
254     * @param argName The name of the additional argument to remove
255     */
256    public void removeArgumentForEvents(String[] eventIds, String argName)
257    {
258        Request request = ContextHelper.getRequest(_avalonContext);
259        
260        // Map eventId -> argName -> argValue
261        @SuppressWarnings("unchecked")
262        Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME);
263        if (additionalArgsForEvents == null)
264        {
265            return;
266        }
267        
268        for (String eventId : eventIds)
269        {
270            Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId);
271            if (additionalArgs == null)
272            {
273                break;
274            }
275            
276            additionalArgs.remove(argName);
277        }
278    }
279    
280    private void _putAdditionalArgs(final Event event)
281    {
282        String eventId = event.getId();
283        
284        Request request = null;
285        try
286        {
287            request = ContextHelper.getRequest(_avalonContext);
288        }
289        catch (CascadingRuntimeException e)
290        {
291            // can happen if no request
292            // return empty list
293        }
294        if (request != null)
295        {
296            @SuppressWarnings("unchecked")
297            Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME);
298            if (additionalArgsForEvents != null)
299            {
300                Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId);
301                if (additionalArgs != null)
302                {
303                    event.getArguments().putAll(additionalArgs);
304                }
305            }
306        }
307    }
308    
309    /**
310     * Observes the event
311     * @param event The event
312     * @param supportedObservers list of observers
313     * @param result The result object to fill 
314     * @throws Exception on error
315     */
316    protected void _observesEvent(final Event event, List<Observer> supportedObservers, List<Future> result) throws Exception
317    {
318        List<AsyncObserver> parallelizableAsyncObservers = new ArrayList<>();
319        List<AsyncObserver> nonParallelizableAsyncObservers = new ArrayList<>();
320        
321        Map<String, Object> transientVars = new HashMap<>();
322        for (Observer supportedObserver : supportedObservers)
323        {
324            if (getLogger().isInfoEnabled())
325            {
326                getLogger().info("Notifying observer: " + supportedObserver + " for event: " + event);
327            }
328            
329            // Observe current event
330            if (supportedObserver instanceof AsyncObserver)
331            {
332                AsyncObserver asyncObserver = (AsyncObserver) supportedObserver;
333                boolean parallelizable = asyncObserver.parallelizable();
334                
335                if (getLogger().isDebugEnabled())
336                {
337                    getLogger().debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.",
338                            parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event));
339                }
340                if (__ALL_EVENTS_LOGGER.isDebugEnabled())
341                {
342                    __ALL_EVENTS_LOGGER.debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.",
343                            parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event));
344                }
345                
346                if (parallelizable)
347                {
348                    parallelizableAsyncObservers.add(asyncObserver);
349                }
350                else
351                {
352                    nonParallelizableAsyncObservers.add(asyncObserver);
353                }
354            }
355            else
356            {
357                try
358                {
359                    supportedObserver.observe(event, transientVars);
360                }
361                catch (Throwable e)
362                {
363                    // Observation process should continue
364                    getLogger().error("The synchronous observer '" + supportedObserver.getClass().getName() + "' failed to observe " + event, e);
365                }
366            }
367        }
368        
369        // Observe for async observer.
370        if (!parallelizableAsyncObservers.isEmpty() || !nonParallelizableAsyncObservers.isEmpty())
371        {
372            _asyncObserve(parallelizableAsyncObservers, nonParallelizableAsyncObservers, event, transientVars, result);
373        }
374    }
375
376    /**
377     * Async observe through a thread pool
378     * @param parallelObservers parallelizable asynchronous observers
379     * @param nonParallelObservers non parallelizable asynchronous observers
380     * @param event The event to observe
381     * @param transientVars The observer transient vars
382     * @param result The future results
383     */
384    private void _asyncObserve(Collection<AsyncObserver> parallelObservers, List<AsyncObserver> nonParallelObservers, Event event, Map<String, Object> transientVars, List<Future> result)
385    {
386        if (__SINGLE_THREAD_EXECUTOR == null)
387        {
388            AsyncObserveThreadFactory threadFactory = new AsyncObserveThreadFactory();
389            
390            __SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor(threadFactory);
391            
392            // 10 threads per core
393            __PARALLEL_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 10 * Runtime.getRuntime().availableProcessors(), 60L, TimeUnit.MILLISECONDS,
394                    new LinkedBlockingQueue<Runnable>(), threadFactory);
395        }
396        
397        if (!parallelObservers.isEmpty())
398        {
399            for (AsyncObserver observer :  parallelObservers)
400            {
401                Future future = __PARALLEL_THREAD_EXECUTOR.submit(new ParallelAsyncObserve(observer, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER));
402                result.add(future);
403            }
404        }
405        
406        if (!nonParallelObservers.isEmpty())
407        {
408            Future future = __SINGLE_THREAD_EXECUTOR.submit(new NonParallelAsyncObserve(nonParallelObservers, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER));
409            result.add(future);
410        }
411    }
412    
413    /**
414     * Runnable to be used for asynchronous calls 
415     */
416    abstract class AbstractAsyncObserve implements Callable<Object>
417    {
418        /** event to observe */
419        protected final Event _event;
420        protected final Map<String, Object> _transientVars;
421        protected final org.apache.avalon.framework.logger.Logger _logger;
422        protected final Logger _allEventLogger;
423        
424        AbstractAsyncObserve(Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
425        {
426            _event = event;
427            _transientVars = transientVars;
428            _logger = logger;
429            _allEventLogger = allEventLogger;
430        }
431        
432        @Override
433        public Object call()
434        {
435            Map<String, Object> environmentInformation = null;
436            Integer computedResult = 0;
437            try
438            {
439                // Create the environment.
440                environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_manager, _context, _logger);
441                
442                _observe();
443            }
444            catch (Exception e)
445            {
446                // Observer must never fail, so just log the error
447                _logger.error("Unable to dispatch event: " + _event + " to asynchronous observers", e);
448                computedResult = -1;
449            }
450            finally
451            {
452                // FIXME close jcr-sessions? (as in JCRSessionDispatchRequestProcess)
453                
454                // Leave the environment.
455                if (environmentInformation != null)
456                {
457                    BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation);
458                }
459            }
460            
461            return computedResult;
462        }
463        
464        /**
465         * Abstract observe method where the observation should be done
466         * @throws Exception on error
467         */
468        protected abstract void _observe() throws Exception;
469    }
470    
471    /**
472     * Runnable for parallel observers 
473     */
474    class ParallelAsyncObserve extends AbstractAsyncObserve
475    {
476        private final AsyncObserver _observer;
477        
478        ParallelAsyncObserve(AsyncObserver observer, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
479        {
480            super(event, transientVars, logger, allEventLogger);
481            _observer = observer;
482        }
483        
484        @Override
485        protected void _observe() throws Exception
486        {
487            if (_logger.isDebugEnabled())
488            {
489                _logger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + ".");
490            }
491            if (_allEventLogger.isDebugEnabled())
492            {
493                _allEventLogger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + ".");
494            }
495            
496            _observer.observe(_event, _transientVars);
497        }
498    }
499    
500    /**
501     * Runnable for non parallel observers 
502     */
503    class NonParallelAsyncObserve extends AbstractAsyncObserve
504    {
505        private final Collection<AsyncObserver> _observers;
506        
507        NonParallelAsyncObserve(Collection<AsyncObserver> observers, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
508        {
509            super(event, transientVars, logger, allEventLogger);
510            _observers = observers;
511        }
512        
513        @Override
514        protected void _observe() throws Exception
515        {
516            for (AsyncObserver observer : _observers)
517            {
518                if (_logger.isDebugEnabled())
519                {
520                    _logger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + ".");
521                }
522                if (_allEventLogger.isDebugEnabled())
523                {
524                    _allEventLogger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + ".");
525                }
526                
527                observer.observe(_event, _transientVars);
528            }
529        }
530    }
531    
532    /**
533     * Thread factory for async observers.
534     * Set the thread name format and marks the thread as daemon. 
535     */
536    static class AsyncObserveThreadFactory implements ThreadFactory
537    {
538        private static ThreadFactory _defaultThreadFactory;
539        private static String _nameFormat;
540        private static AtomicLong _count;
541        
542        public AsyncObserveThreadFactory()
543        {
544            _defaultThreadFactory = Executors.defaultThreadFactory();
545            _nameFormat = "ametys-async-observe-%d";
546            _count = new AtomicLong(0);
547        }
548        
549        public Thread newThread(Runnable r)
550        {
551            Thread thread = _defaultThreadFactory.newThread(r);
552            thread.setName(String.format(_nameFormat, _count.getAndIncrement()));
553            thread.setDaemon(true);
554            
555            return thread;
556        }
557    }
558    
559    @Override
560    public void dispose()
561    {
562        if (__SINGLE_THREAD_EXECUTOR != null)
563        {
564            __SINGLE_THREAD_EXECUTOR.shutdownNow();
565            __SINGLE_THREAD_EXECUTOR = null;
566        }
567        if (__PARALLEL_THREAD_EXECUTOR != null)
568        {
569            __PARALLEL_THREAD_EXECUTOR.shutdownNow();
570            __PARALLEL_THREAD_EXECUTOR = null;
571        }
572        
573        _context = null;
574        _manager = null;
575    }
576
577    /**
578     * Registers an {@link Observer}.
579     * @param observer the {@link Observer}.
580     */
581    public void registerObserver(Observer observer)
582    {
583        _registeredObservers.add(observer);
584    }
585    
586    /**
587     * Unregisters an {@link Observer}.
588     * @param observer the {@link Observer}.
589     */
590    public void unregisterObserver(Observer observer)
591    {
592        _registeredObservers.remove(observer);
593    }
594}