001/*
002 *  Copyright 2013 Anyware Services
003 *
004 *  Licensed under the Apache License, Version 2.0 (the "License");
005 *  you may not use this file except in compliance with the License.
006 *  You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 *  Unless required by applicable law or agreed to in writing, software
011 *  distributed under the License is distributed on an "AS IS" BASIS,
012 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 *  See the License for the specific language governing permissions and
014 *  limitations under the License.
015 */
016package org.ametys.core.observation;
017
018import java.util.ArrayList;
019import java.util.Collection;
020import java.util.Collections;
021import java.util.Comparator;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.Future;
029import java.util.concurrent.LinkedBlockingQueue;
030import java.util.concurrent.ThreadFactory;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicLong;
034
035import org.apache.avalon.framework.CascadingRuntimeException;
036import org.apache.avalon.framework.activity.Disposable;
037import org.apache.avalon.framework.component.Component;
038import org.apache.avalon.framework.context.ContextException;
039import org.apache.avalon.framework.context.Contextualizable;
040import org.apache.avalon.framework.logger.AbstractLogEnabled;
041import org.apache.avalon.framework.service.ServiceException;
042import org.apache.avalon.framework.service.ServiceManager;
043import org.apache.avalon.framework.service.Serviceable;
044import org.apache.cocoon.Constants;
045import org.apache.cocoon.components.ContextHelper;
046import org.apache.cocoon.environment.Context;
047import org.apache.cocoon.environment.Request;
048import org.slf4j.Logger;
049import org.slf4j.LoggerFactory;
050
051import org.ametys.core.engine.BackgroundEngineHelper;
052
053/**
054 * Manager for dispatching {@link Event} instances to {@link Observer}s.
055 */
056public class ObservationManager extends AbstractLogEnabled implements Component, Serviceable, Contextualizable, Disposable
057{
058    /** Avalon ROLE. */
059    public static final String ROLE = ObservationManager.class.getName();
060    
061    private static final String __CURRENT_FUTURES_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$currentFutures";
062    private static final String __ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$.additional.args";
063    private static final Logger __ALL_EVENTS_LOGGER = LoggerFactory.getLogger("org.ametys.cms.observation.AllEvents");
064    
065    /**
066     * The executor service managing the single thread pool. This threads is
067     * used to run non-parallelizable observers.
068     */
069    private static ExecutorService __SINGLE_THREAD_EXECUTOR;
070    
071    /**
072     * The executor service managing the thread pool of asynchronous observers
073     * allowed to run in parallel.
074     */
075    private static ExecutorService __PARALLEL_THREAD_EXECUTOR;
076    
077    /** Cocoon context */
078    protected Context _context;
079    
080    /** Service manager */
081    protected ServiceManager _manager;
082    
083    private org.apache.avalon.framework.context.Context _avalonContext;
084    private ObserverExtensionPoint _observerExtPt;
085    private Collection<Observer> _registeredObservers = new ArrayList<>();
086    
087    @Override
088    public void service(ServiceManager manager) throws ServiceException
089    {
090        _manager = manager;
091        _observerExtPt  = (ObserverExtensionPoint) manager.lookup(ObserverExtensionPoint.ROLE);
092    }
093    
094    @Override
095    public void contextualize(org.apache.avalon.framework.context.Context context) throws ContextException
096    {
097        _avalonContext = context;
098        _context = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT);
099    }
100    
101    /**
102     * Notify of a event which will be dispatch to registered
103     * observers.
104     * @param event the event to notify.
105     * @return The {@link Future} objects of the asynchronous observers
106     */
107    public List<Future> notify(final Event event)
108    {
109        List<Future> result = new ArrayList<>();
110        
111        try
112        {
113            if (getLogger().isDebugEnabled())
114            {
115                getLogger().debug("Receiving " + event);
116            }
117            
118            if (__ALL_EVENTS_LOGGER.isInfoEnabled())
119            {
120                __ALL_EVENTS_LOGGER.info("Receiving " + event);
121            }
122            
123            List<Observer> supportedObservers = new ArrayList<>();
124            
125            // Retrieve supported observers
126            for (String observerId : _observerExtPt.getExtensionsIds())
127            {
128                Observer observer = _observerExtPt.getExtension(observerId);
129                
130                if (getLogger().isDebugEnabled())
131                {
132                    getLogger().debug("Checking support for event: " + event + " and observer: " + observer);
133                }
134                
135                if (observer.supports(event))
136                {
137                    if (getLogger().isDebugEnabled())
138                    {
139                        getLogger().debug("Event: " + event + " supported for observer: " + observer);
140                    }
141                    
142                    supportedObservers.add(observer);
143                }
144            }
145            
146            // Order observers (0 is first, Integer.MAX_INT is last)
147            Collections.sort(supportedObservers, new Comparator<Observer>()
148            {
149                @Override
150                public int compare(Observer o1, Observer o2)
151                {
152                    return new Integer(o1.getPriority(event)).compareTo(new Integer(o2.getPriority(event)));
153                }
154        
155            });
156            
157            // Observes the event and prepares the asynchronous observes.
158            _putAdditionalArgs(event);
159            _observesEvent(event, supportedObservers, result);
160        }
161        catch (Exception e)
162        {
163            // Observers should never fail, so just log the error
164            getLogger().error("Unable to dispatch event: " + event + " to observers", e);
165        }
166        
167        // Keep in current request the futures of the curent request
168        try
169        {
170            List<Future> futuresForCurrentRequest = _getFuturesForRequest();
171            futuresForCurrentRequest.addAll(result);
172        }
173        catch (CascadingRuntimeException e)
174        {
175            // can happen if no request
176            // do nothing though
177        }
178        
179        return result;
180    }
181    
182    /**
183     * Gets all the {@link Future}s of all the {@link AsyncObserver}s notified by the call to {@link ObservationManager#notify(Event)} during the current request.
184     * <br>This can be useful to call {@link Future#get()} in order to ensure all {@link AsyncObserver}s notified by the current request have finished to work.
185     * @return the {@link Future}s of the notified observers during the current request.
186     */
187    public List<Future> getFuturesForRequest()
188    {
189        try
190        {
191            return Collections.unmodifiableList(_getFuturesForRequest());
192        }
193        catch (CascadingRuntimeException e)
194        {
195            // can happen if no request
196            // return empty list
197            return Collections.emptyList();
198        }
199    }
200    
201    private List<Future> _getFuturesForRequest() throws CascadingRuntimeException
202    {
203        Request request = ContextHelper.getRequest(_avalonContext);
204        @SuppressWarnings("unchecked")
205        List<Future> futuresForCurrentRequest = (List<Future>) request.getAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME);
206        if (futuresForCurrentRequest == null)
207        {
208            futuresForCurrentRequest = new ArrayList<>();
209            request.setAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME, futuresForCurrentRequest);
210        }
211        return futuresForCurrentRequest;
212    }
213    
214    /**
215     * For all events of the given event id which will be notified during the current request, add an additional argument
216     * @param eventIds The event ids
217     * @param argName The name of the additional argument
218     * @param argValue The value of the additional argument
219     */
220    public void addArgumentForEvents(String[] eventIds, String argName, Object argValue)
221    {
222        Request request = ContextHelper.getRequest(_avalonContext);
223        
224        // Map eventId -> argName -> argValue
225        @SuppressWarnings("unchecked")
226        Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME);
227        if (additionalArgsForEvents == null)
228        {
229            additionalArgsForEvents = new HashMap<>();
230            request.setAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME, additionalArgsForEvents);
231        }
232        
233        for (String eventId : eventIds)
234        {
235            Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId);
236            if (additionalArgs == null)
237            {
238                additionalArgs = new HashMap<>();
239                additionalArgsForEvents.put(eventId, additionalArgs);
240            }
241            
242            additionalArgs.put(argName, argValue);
243        }
244    }
245    
246    /**
247     * For all events of the given event id which will be notified during the current request, removes the additional argument
248     * with the given name which was added with the {@link ObservationManager#addArgumentForEvents(String[], String, Object)} method.
249     * @param eventIds The event ids
250     * @param argName The name of the additional argument to remove
251     */
252    public void removeArgumentForEvents(String[] eventIds, String argName)
253    {
254        Request request = ContextHelper.getRequest(_avalonContext);
255        
256        // Map eventId -> argName -> argValue
257        @SuppressWarnings("unchecked")
258        Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME);
259        if (additionalArgsForEvents == null)
260        {
261            return;
262        }
263        
264        for (String eventId : eventIds)
265        {
266            Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId);
267            if (additionalArgs == null)
268            {
269                break;
270            }
271            
272            additionalArgs.remove(argName);
273        }
274    }
275    
276    private void _putAdditionalArgs(final Event event)
277    {
278        String eventId = event.getId();
279        
280        Request request = null;
281        try
282        {
283            request = ContextHelper.getRequest(_avalonContext);
284        }
285        catch (CascadingRuntimeException e)
286        {
287            // can happen if no request
288            // return empty list
289        }
290        if (request != null)
291        {
292            @SuppressWarnings("unchecked")
293            Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME);
294            if (additionalArgsForEvents != null)
295            {
296                Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId);
297                if (additionalArgs != null)
298                {
299                    event.getArguments().putAll(additionalArgs);
300                }
301            }
302        }
303    }
304    
305    /**
306     * Observes the event
307     * @param event The event
308     * @param supportedObservers list of observers
309     * @param result The result object to fill 
310     * @throws Exception on error
311     */
312    protected void _observesEvent(final Event event, List<Observer> supportedObservers, List<Future> result) throws Exception
313    {
314        List<AsyncObserver> parallelizableAsyncObservers = new ArrayList<>();
315        List<AsyncObserver> nonParallelizableAsyncObservers = new ArrayList<>();
316        
317        Map<String, Object> transientVars = new HashMap<>();
318        for (Observer supportedObserver : supportedObservers)
319        {
320            if (getLogger().isInfoEnabled())
321            {
322                getLogger().info("Notifying observer: " + supportedObserver + " for event: " + event);
323            }
324            
325            // Observe current event
326            if (supportedObserver instanceof AsyncObserver)
327            {
328                AsyncObserver asyncObserver = (AsyncObserver) supportedObserver;
329                boolean parallelizable = asyncObserver.parallelizable();
330                
331                if (getLogger().isDebugEnabled())
332                {
333                    getLogger().debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.",
334                            parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event));
335                }
336                if (__ALL_EVENTS_LOGGER.isDebugEnabled())
337                {
338                    __ALL_EVENTS_LOGGER.debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.",
339                            parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event));
340                }
341                
342                if (parallelizable)
343                {
344                    parallelizableAsyncObservers.add(asyncObserver);
345                }
346                else
347                {
348                    nonParallelizableAsyncObservers.add(asyncObserver);
349                }
350            }
351            else
352            {
353                try
354                {
355                    supportedObserver.observe(event, transientVars);
356                }
357                catch (Exception e)
358                {
359                    // Observation process should continue
360                    getLogger().error("The synchronous observer '" + supportedObserver.getClass().getName() + "' failed to observe " + event, e);
361                }
362            }
363        }
364        
365        // Observe for async observer.
366        if (!parallelizableAsyncObservers.isEmpty() || !nonParallelizableAsyncObservers.isEmpty())
367        {
368            _asyncObserve(parallelizableAsyncObservers, nonParallelizableAsyncObservers, event, transientVars, result);
369        }
370    }
371
372    /**
373     * Async observe through a thread pool
374     * @param parallelObservers parallelizable asynchronous observers
375     * @param nonParallelObservers non parallelizable asynchronous observers
376     * @param event The event to observe
377     * @param transientVars The observer transient vars
378     * @param result The future results
379     */
380    private void _asyncObserve(Collection<AsyncObserver> parallelObservers, List<AsyncObserver> nonParallelObservers, Event event, Map<String, Object> transientVars, List<Future> result)
381    {
382        if (__SINGLE_THREAD_EXECUTOR == null)
383        {
384            AsyncObserveThreadFactory threadFactory = new AsyncObserveThreadFactory();
385            
386            __SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor(threadFactory);
387            
388            // 10 threads per core
389            __PARALLEL_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 10 * Runtime.getRuntime().availableProcessors(), 60L, TimeUnit.MILLISECONDS,
390                    new LinkedBlockingQueue<Runnable>(), threadFactory);
391        }
392        
393        if (!parallelObservers.isEmpty())
394        {
395            for (AsyncObserver observer :  parallelObservers)
396            {
397                Future future = __PARALLEL_THREAD_EXECUTOR.submit(new ParallelAsyncObserve(observer, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER));
398                result.add(future);
399            }
400        }
401        
402        if (!nonParallelObservers.isEmpty())
403        {
404            Future future = __SINGLE_THREAD_EXECUTOR.submit(new NonParallelAsyncObserve(nonParallelObservers, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER));
405            result.add(future);
406        }
407    }
408    
409    /**
410     * Runnable to be used for asynchronous calls 
411     */
412    abstract class AbstractAsyncObserve implements Callable<Object>
413    {
414        /** event to observe */
415        protected final Event _event;
416        protected final Map<String, Object> _transientVars;
417        protected final org.apache.avalon.framework.logger.Logger _logger;
418        protected final Logger _allEventLogger;
419        
420        AbstractAsyncObserve(Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
421        {
422            _event = event;
423            _transientVars = transientVars;
424            _logger = logger;
425            _allEventLogger = allEventLogger;
426        }
427        
428        @Override
429        public Object call()
430        {
431            Map<String, Object> environmentInformation = null;
432            Integer computedResult = 0;
433            try
434            {
435                // Create the environment.
436                environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_manager, _context, _logger);
437                
438                _observe();
439            }
440            catch (Exception e)
441            {
442                // Observer must never fail, so just log the error
443                _logger.error("Unable to dispatch event: " + _event + " to asynchronous observers", e);
444                computedResult = -1;
445            }
446            finally
447            {
448                // FIXME close jcr-sessions? (as in JCRSessionDispatchRequestProcess)
449                
450                // Leave the environment.
451                if (environmentInformation != null)
452                {
453                    BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation);
454                }
455            }
456            
457            return computedResult;
458        }
459        
460        /**
461         * Abstract observe method where the observation should be done
462         * @throws Exception on error
463         */
464        protected abstract void _observe() throws Exception;
465    }
466    
467    /**
468     * Runnable for parallel observers 
469     */
470    class ParallelAsyncObserve extends AbstractAsyncObserve
471    {
472        private final AsyncObserver _observer;
473        
474        ParallelAsyncObserve(AsyncObserver observer, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
475        {
476            super(event, transientVars, logger, allEventLogger);
477            _observer = observer;
478        }
479        
480        @Override
481        protected void _observe() throws Exception
482        {
483            if (_logger.isDebugEnabled())
484            {
485                _logger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + ".");
486            }
487            if (_allEventLogger.isDebugEnabled())
488            {
489                _allEventLogger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + ".");
490            }
491            
492            _observer.observe(_event, _transientVars);
493        }
494    }
495    
496    /**
497     * Runnable for non parallel observers 
498     */
499    class NonParallelAsyncObserve extends AbstractAsyncObserve
500    {
501        private final Collection<AsyncObserver> _observers;
502        
503        NonParallelAsyncObserve(Collection<AsyncObserver> observers, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
504        {
505            super(event, transientVars, logger, allEventLogger);
506            _observers = observers;
507        }
508        
509        @Override
510        protected void _observe() throws Exception
511        {
512            for (AsyncObserver observer : _observers)
513            {
514                if (_logger.isDebugEnabled())
515                {
516                    _logger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + ".");
517                }
518                if (_allEventLogger.isDebugEnabled())
519                {
520                    _allEventLogger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + ".");
521                }
522                
523                observer.observe(_event, _transientVars);
524            }
525        }
526    }
527    
528    /**
529     * Thread factory for async observers.
530     * Set the thread name format and marks the thread as daemon. 
531     */
532    static class AsyncObserveThreadFactory implements ThreadFactory
533    {
534        private static ThreadFactory _defaultThreadFactory;
535        private static String _nameFormat;
536        private static AtomicLong _count;
537        
538        public AsyncObserveThreadFactory()
539        {
540            _defaultThreadFactory = Executors.defaultThreadFactory();
541            _nameFormat = "ametys-async-observe-%d";
542            _count = new AtomicLong(0);
543        }
544        
545        public Thread newThread(Runnable r)
546        {
547            Thread thread = _defaultThreadFactory.newThread(r);
548            thread.setName(String.format(_nameFormat, _count.getAndIncrement()));
549            thread.setDaemon(true);
550            
551            return thread;
552        }
553    }
554    
555    @Override
556    public void dispose()
557    {
558        if (__SINGLE_THREAD_EXECUTOR != null)
559        {
560            __SINGLE_THREAD_EXECUTOR.shutdownNow();
561            __SINGLE_THREAD_EXECUTOR = null;
562        }
563        if (__PARALLEL_THREAD_EXECUTOR != null)
564        {
565            __PARALLEL_THREAD_EXECUTOR.shutdownNow();
566            __PARALLEL_THREAD_EXECUTOR = null;
567        }
568        
569        _context = null;
570        _manager = null;
571    }
572
573    /**
574     * Registers an {@link Observer}.
575     * @param observer the {@link Observer}.
576     */
577    public void registerObserver(Observer observer)
578    {
579        _registeredObservers.add(observer);
580    }
581    
582    /**
583     * Unregisters an {@link Observer}.
584     * @param observer the {@link Observer}.
585     */
586    public void unregisterObserver(Observer observer)
587    {
588        _registeredObservers.remove(observer);
589    }
590}