001/*
002 *  Copyright 2013 Anyware Services
003 *
004 *  Licensed under the Apache License, Version 2.0 (the "License");
005 *  you may not use this file except in compliance with the License.
006 *  You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 *  Unless required by applicable law or agreed to in writing, software
011 *  distributed under the License is distributed on an "AS IS" BASIS,
012 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 *  See the License for the specific language governing permissions and
014 *  limitations under the License.
015 */
016package org.ametys.core.observation;
017
018import java.util.ArrayList;
019import java.util.Collection;
020import java.util.Collections;
021import java.util.Comparator;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.Future;
029import java.util.concurrent.ThreadFactory;
030import java.util.concurrent.atomic.AtomicLong;
031
032import org.apache.avalon.framework.CascadingRuntimeException;
033import org.apache.avalon.framework.activity.Disposable;
034import org.apache.avalon.framework.component.Component;
035import org.apache.avalon.framework.context.ContextException;
036import org.apache.avalon.framework.context.Contextualizable;
037import org.apache.avalon.framework.logger.AbstractLogEnabled;
038import org.apache.avalon.framework.service.ServiceException;
039import org.apache.avalon.framework.service.ServiceManager;
040import org.apache.avalon.framework.service.Serviceable;
041import org.apache.cocoon.Constants;
042import org.apache.cocoon.components.ContextHelper;
043import org.apache.cocoon.environment.Context;
044import org.apache.cocoon.environment.Request;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.ametys.core.engine.BackgroundEngineHelper;
049
050/**
051 * Manager for dispatching {@link Event} instances to {@link Observer}s.
052 */
053public class ObservationManager extends AbstractLogEnabled implements Component, Serviceable, Contextualizable, Disposable
054{
055    /** Avalon ROLE. */
056    public static final String ROLE = ObservationManager.class.getName();
057    
058    private static final String __CURRENT_FUTURES_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$currentFutures";
059    private static final String __ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$.additional.args";
060    private static final Logger __ALL_EVENTS_LOGGER = LoggerFactory.getLogger("org.ametys.cms.observation.AllEvents");
061    
062    /**
063     * The executor service managing the single thread pool. This threads is
064     * used to run non-parallelizable observers.
065     */
066    private static ExecutorService __SINGLE_THREAD_EXECUTOR;
067    
068    /**
069     * The executor service managing the thread pool of asynchronous observers
070     * allowed to run in parallel.
071     */
072    private static ExecutorService __PARALLEL_THREAD_EXECUTOR;
073    
074    /** Cocoon context */
075    protected Context _context;
076    
077    /** Service manager */
078    protected ServiceManager _manager;
079    
080    private org.apache.avalon.framework.context.Context _avalonContext;
081    private ObserverExtensionPoint _observerExtPt;
082    private Collection<Observer> _registeredObservers = new ArrayList<>();
083    
084    @Override
085    public void service(ServiceManager manager) throws ServiceException
086    {
087        _manager = manager;
088        _observerExtPt  = (ObserverExtensionPoint) manager.lookup(ObserverExtensionPoint.ROLE);
089    }
090    
091    @Override
092    public void contextualize(org.apache.avalon.framework.context.Context context) throws ContextException
093    {
094        _avalonContext = context;
095        _context = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT);
096    }
097    
098    /**
099     * Notify of a event which will be dispatch to registered
100     * observers.
101     * @param event the event to notify.
102     * @return The {@link Future} objects of the asynchronous observers
103     */
104    public List<Future> notify(final Event event)
105    {
106        List<Future> result = new ArrayList<>();
107        
108        try
109        {
110            if (getLogger().isDebugEnabled())
111            {
112                getLogger().debug("Receiving " + event);
113            }
114            
115            if (__ALL_EVENTS_LOGGER.isInfoEnabled())
116            {
117                __ALL_EVENTS_LOGGER.info("Receiving " + event);
118            }
119            
120            List<Observer> supportedObservers = new ArrayList<>();
121            
122            // Retrieve supported observers from extension point
123            for (String observerId : _observerExtPt.getExtensionsIds())
124            {
125                Observer observer = _observerExtPt.getExtension(observerId);
126                _addSupportedObserver(supportedObservers, observer, event);
127            }
128            
129            // Retrieve supported observers from manual registration
130            for (Observer observer : _registeredObservers)
131            {
132                _addSupportedObserver(supportedObservers, observer, event);
133            }
134            
135            // Order observers (0 is first, Integer.MAX_INT is last)
136            Collections.sort(supportedObservers, Comparator.comparingInt(
137                observer -> observer.getPriority(event)
138            ));
139            
140            // Observes the event and prepares the asynchronous observes.
141            _putAdditionalArgs(event);
142            _observesEvent(event, supportedObservers, result);
143        }
144        catch (Exception e)
145        {
146            // Observers should never fail, so just log the error
147            getLogger().error("Unable to dispatch event: " + event + " to observers", e);
148        }
149        
150        // Keep in current request the futures of the curent request
151        try
152        {
153            List<Future> futuresForCurrentRequest = _getFuturesForRequest();
154            futuresForCurrentRequest.addAll(result);
155        }
156        catch (CascadingRuntimeException e)
157        {
158            // can happen if no request
159            // do nothing though
160        }
161        
162        return result;
163    }
164    
165    private void _addSupportedObserver(List<Observer> supportedObservers, Observer observer, final Event event)
166    {
167        if (getLogger().isDebugEnabled())
168        {
169            getLogger().debug("Checking support for event: " + event + " and observer: " + observer);
170        }
171        
172        if (observer.supports(event))
173        {
174            if (getLogger().isDebugEnabled())
175            {
176                getLogger().debug("Event: " + event + " supported for observer: " + observer);
177            }
178            
179            supportedObservers.add(observer);
180        }
181    }
182    
183    /**
184     * Gets all the {@link Future}s of all the {@link AsyncObserver}s notified by the call to {@link ObservationManager#notify(Event)} during the current request.
185     * <br>This can be useful to call {@link Future#get()} in order to ensure all {@link AsyncObserver}s notified by the current request have finished to work.
186     * @return the {@link Future}s of the notified observers during the current request.
187     */
188    public List<Future> getFuturesForRequest()
189    {
190        try
191        {
192            return Collections.unmodifiableList(_getFuturesForRequest());
193        }
194        catch (CascadingRuntimeException e)
195        {
196            // can happen if no request
197            // return empty list
198            return Collections.emptyList();
199        }
200    }
201    
202    private List<Future> _getFuturesForRequest() throws CascadingRuntimeException
203    {
204        Request request = ContextHelper.getRequest(_avalonContext);
205        @SuppressWarnings("unchecked")
206        List<Future> futuresForCurrentRequest = (List<Future>) request.getAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME);
207        if (futuresForCurrentRequest == null)
208        {
209            futuresForCurrentRequest = new ArrayList<>();
210            request.setAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME, futuresForCurrentRequest);
211        }
212        return futuresForCurrentRequest;
213    }
214    
215    /**
216     * For all events of the given event id which will be notified during the current request, add an additional argument
217     * @param eventIds The event ids
218     * @param argName The name of the additional argument
219     * @param argValue The value of the additional argument
220     */
221    public void addArgumentForEvents(String[] eventIds, String argName, Object argValue)
222    {
223        Request request = ContextHelper.getRequest(_avalonContext);
224        
225        // Map eventId -> argName -> argValue
226        @SuppressWarnings("unchecked")
227        Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME);
228        if (additionalArgsForEvents == null)
229        {
230            additionalArgsForEvents = new HashMap<>();
231            request.setAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME, additionalArgsForEvents);
232        }
233        
234        for (String eventId : eventIds)
235        {
236            Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId);
237            if (additionalArgs == null)
238            {
239                additionalArgs = new HashMap<>();
240                additionalArgsForEvents.put(eventId, additionalArgs);
241            }
242            
243            additionalArgs.put(argName, argValue);
244        }
245    }
246    
247    /**
248     * For all events of the given event id which will be notified during the current request, removes the additional argument
249     * with the given name which was added with the {@link ObservationManager#addArgumentForEvents(String[], String, Object)} method.
250     * @param eventIds The event ids
251     * @param argName The name of the additional argument to remove
252     */
253    public void removeArgumentForEvents(String[] eventIds, String argName)
254    {
255        Request request = ContextHelper.getRequest(_avalonContext);
256        
257        // Map eventId -> argName -> argValue
258        @SuppressWarnings("unchecked")
259        Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME);
260        if (additionalArgsForEvents == null)
261        {
262            return;
263        }
264        
265        for (String eventId : eventIds)
266        {
267            Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId);
268            if (additionalArgs == null)
269            {
270                break;
271            }
272            
273            additionalArgs.remove(argName);
274        }
275    }
276    
277    private void _putAdditionalArgs(final Event event)
278    {
279        String eventId = event.getId();
280        
281        Request request = null;
282        try
283        {
284            request = ContextHelper.getRequest(_avalonContext);
285        }
286        catch (CascadingRuntimeException e)
287        {
288            // can happen if no request
289            // return empty list
290        }
291        if (request != null)
292        {
293            @SuppressWarnings("unchecked")
294            Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME);
295            if (additionalArgsForEvents != null)
296            {
297                Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId);
298                if (additionalArgs != null)
299                {
300                    event.getArguments().putAll(additionalArgs);
301                }
302            }
303        }
304    }
305    
306    /**
307     * Observes the event
308     * @param event The event
309     * @param supportedObservers list of observers
310     * @param result The result object to fill 
311     * @throws Exception on error
312     */
313    protected void _observesEvent(final Event event, List<Observer> supportedObservers, List<Future> result) throws Exception
314    {
315        List<AsyncObserver> parallelizableAsyncObservers = new ArrayList<>();
316        List<AsyncObserver> nonParallelizableAsyncObservers = new ArrayList<>();
317        
318        Map<String, Object> transientVars = new HashMap<>();
319        for (Observer supportedObserver : supportedObservers)
320        {
321            if (getLogger().isInfoEnabled())
322            {
323                getLogger().info("Notifying observer: " + supportedObserver + " for event: " + event);
324            }
325            
326            // Observe current event
327            if (supportedObserver instanceof AsyncObserver)
328            {
329                AsyncObserver asyncObserver = (AsyncObserver) supportedObserver;
330                boolean parallelizable = asyncObserver.parallelizable();
331                
332                if (getLogger().isDebugEnabled())
333                {
334                    getLogger().debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.",
335                            parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event));
336                }
337                if (__ALL_EVENTS_LOGGER.isDebugEnabled())
338                {
339                    __ALL_EVENTS_LOGGER.debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.",
340                            parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event));
341                }
342                
343                if (parallelizable)
344                {
345                    parallelizableAsyncObservers.add(asyncObserver);
346                }
347                else
348                {
349                    nonParallelizableAsyncObservers.add(asyncObserver);
350                }
351            }
352            else
353            {
354                try
355                {
356                    supportedObserver.observe(event, transientVars);
357                }
358                catch (Throwable e)
359                {
360                    // Observation process should continue
361                    getLogger().error("The synchronous observer '" + supportedObserver.getClass().getName() + "' failed to observe " + event, e);
362                }
363            }
364        }
365        
366        // Observe for async observer.
367        if (!parallelizableAsyncObservers.isEmpty() || !nonParallelizableAsyncObservers.isEmpty())
368        {
369            _asyncObserve(parallelizableAsyncObservers, nonParallelizableAsyncObservers, event, transientVars, result);
370        }
371    }
372
373    /**
374     * Async observe through a thread pool
375     * @param parallelObservers parallelizable asynchronous observers
376     * @param nonParallelObservers non parallelizable asynchronous observers
377     * @param event The event to observe
378     * @param transientVars The observer transient vars
379     * @param result The future results
380     */
381    private void _asyncObserve(Collection<AsyncObserver> parallelObservers, List<AsyncObserver> nonParallelObservers, Event event, Map<String, Object> transientVars, List<Future> result)
382    {
383        if (__SINGLE_THREAD_EXECUTOR == null)
384        {
385            AsyncObserveThreadFactory threadFactory = new AsyncObserveThreadFactory();
386            
387            __SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor(threadFactory);
388            
389            // 1 thread per core
390            __PARALLEL_THREAD_EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory);
391        }
392        
393        if (!parallelObservers.isEmpty())
394        {
395            for (AsyncObserver observer :  parallelObservers)
396            {
397                Future future = __PARALLEL_THREAD_EXECUTOR.submit(new ParallelAsyncObserve(observer, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER));
398                result.add(future);
399            }
400        }
401        
402        if (!nonParallelObservers.isEmpty())
403        {
404            Future future = __SINGLE_THREAD_EXECUTOR.submit(new NonParallelAsyncObserve(nonParallelObservers, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER));
405            result.add(future);
406        }
407    }
408    
409    /**
410     * Runnable to be used for asynchronous calls 
411     */
412    abstract class AbstractAsyncObserve implements Callable<Object>
413    {
414        /** event to observe */
415        protected final Event _event;
416        protected final Map<String, Object> _transientVars;
417        protected final org.apache.avalon.framework.logger.Logger _logger;
418        protected final Logger _allEventLogger;
419        
420        AbstractAsyncObserve(Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
421        {
422            _event = event;
423            _transientVars = transientVars;
424            _logger = logger;
425            _allEventLogger = allEventLogger;
426        }
427        
428        @Override
429        public Object call()
430        {
431            Map<String, Object> environmentInformation = null;
432            Integer computedResult = 0;
433            try
434            {
435                // Create the environment.
436                environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_manager, _context, _logger);
437                
438                _observe();
439            }
440            catch (Exception e)
441            {
442                // Observer must never fail, so just log the error
443                _logger.error("Unable to dispatch event: " + _event + " to asynchronous observers", e);
444                computedResult = -1;
445            }
446            finally
447            {
448                // FIXME close jcr-sessions? (as in JCRSessionDispatchRequestProcess)
449                
450                BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation);
451            }
452            
453            return computedResult;
454        }
455        
456        /**
457         * Abstract observe method where the observation should be done
458         * @throws Exception on error
459         */
460        protected abstract void _observe() throws Exception;
461    }
462    
463    /**
464     * Runnable for parallel observers 
465     */
466    class ParallelAsyncObserve extends AbstractAsyncObserve
467    {
468        private final AsyncObserver _observer;
469        
470        ParallelAsyncObserve(AsyncObserver observer, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
471        {
472            super(event, transientVars, logger, allEventLogger);
473            _observer = observer;
474        }
475        
476        @Override
477        protected void _observe() throws Exception
478        {
479            if (_logger.isDebugEnabled())
480            {
481                _logger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + ".");
482            }
483            if (_allEventLogger.isDebugEnabled())
484            {
485                _allEventLogger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + ".");
486            }
487            
488            _observer.observe(_event, _transientVars);
489        }
490    }
491    
492    /**
493     * Runnable for non parallel observers 
494     */
495    class NonParallelAsyncObserve extends AbstractAsyncObserve
496    {
497        private final Collection<AsyncObserver> _observers;
498        
499        NonParallelAsyncObserve(Collection<AsyncObserver> observers, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
500        {
501            super(event, transientVars, logger, allEventLogger);
502            _observers = observers;
503        }
504        
505        @Override
506        protected void _observe() throws Exception
507        {
508            for (AsyncObserver observer : _observers)
509            {
510                if (_logger.isDebugEnabled())
511                {
512                    _logger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + ".");
513                }
514                if (_allEventLogger.isDebugEnabled())
515                {
516                    _allEventLogger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + ".");
517                }
518                
519                observer.observe(_event, _transientVars);
520            }
521        }
522    }
523    
524    /**
525     * Thread factory for async observers.
526     * Set the thread name format and marks the thread as daemon. 
527     */
528    static class AsyncObserveThreadFactory implements ThreadFactory
529    {
530        private static ThreadFactory _defaultThreadFactory;
531        private static String _nameFormat;
532        private static AtomicLong _count;
533        
534        public AsyncObserveThreadFactory()
535        {
536            _defaultThreadFactory = Executors.defaultThreadFactory();
537            _nameFormat = "ametys-async-observe-%d";
538            _count = new AtomicLong(0);
539        }
540        
541        public Thread newThread(Runnable r)
542        {
543            Thread thread = _defaultThreadFactory.newThread(r);
544            thread.setName(String.format(_nameFormat, _count.getAndIncrement()));
545            thread.setDaemon(true);
546            
547            return thread;
548        }
549    }
550    
551    @Override
552    public void dispose()
553    {
554        if (__SINGLE_THREAD_EXECUTOR != null)
555        {
556            __SINGLE_THREAD_EXECUTOR.shutdownNow();
557            __SINGLE_THREAD_EXECUTOR = null;
558        }
559        if (__PARALLEL_THREAD_EXECUTOR != null)
560        {
561            __PARALLEL_THREAD_EXECUTOR.shutdownNow();
562            __PARALLEL_THREAD_EXECUTOR = null;
563        }
564        
565        _context = null;
566        _manager = null;
567    }
568
569    /**
570     * Registers an {@link Observer}.
571     * @param observer the {@link Observer}.
572     */
573    public void registerObserver(Observer observer)
574    {
575        _registeredObservers.add(observer);
576    }
577    
578    /**
579     * Unregisters an {@link Observer}.
580     * @param observer the {@link Observer}.
581     */
582    public void unregisterObserver(Observer observer)
583    {
584        _registeredObservers.remove(observer);
585    }
586}