001/*
002 *  Copyright 2013 Anyware Services
003 *
004 *  Licensed under the Apache License, Version 2.0 (the "License");
005 *  you may not use this file except in compliance with the License.
006 *  You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 *  Unless required by applicable law or agreed to in writing, software
011 *  distributed under the License is distributed on an "AS IS" BASIS,
012 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 *  See the License for the specific language governing permissions and
014 *  limitations under the License.
015 */
016package org.ametys.core.observation;
017
018import java.util.ArrayList;
019import java.util.Collection;
020import java.util.Collections;
021import java.util.Comparator;
022import java.util.HashMap;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.Callable;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.Future;
029import java.util.concurrent.LinkedBlockingQueue;
030import java.util.concurrent.ThreadFactory;
031import java.util.concurrent.ThreadPoolExecutor;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.atomic.AtomicLong;
034
035import org.apache.avalon.framework.activity.Disposable;
036import org.apache.avalon.framework.component.Component;
037import org.apache.avalon.framework.context.ContextException;
038import org.apache.avalon.framework.context.Contextualizable;
039import org.apache.avalon.framework.logger.AbstractLogEnabled;
040import org.apache.avalon.framework.service.ServiceException;
041import org.apache.avalon.framework.service.ServiceManager;
042import org.apache.avalon.framework.service.Serviceable;
043import org.apache.cocoon.Constants;
044import org.apache.cocoon.environment.Context;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.ametys.core.engine.BackgroundEngineHelper;
049
050/**
051 * Manager for dispatching {@link Event} instances to {@link Observer}s.
052 */
053public class ObservationManager extends AbstractLogEnabled implements Component, Serviceable, Contextualizable, Disposable
054{
055    /** Avalon ROLE. */
056    public static final String ROLE = ObservationManager.class.getName();
057    
058    private static final Logger __ALL_EVENTS_LOGGER = LoggerFactory.getLogger("org.ametys.cms.observation.AllEvents");
059    
060    /**
061     * The executor service managing the single thread pool. This threads is
062     * used to run non-parallelizable observers.
063     */
064    private static ExecutorService __SINGLE_THREAD_EXECUTOR;
065    
066    /**
067     * The executor service managing the thread pool of asynchronous observers
068     * allowed to run in parallel.
069     */
070    private static ExecutorService __PARALLEL_THREAD_EXECUTOR;
071    
072    /** Cocoon context */
073    protected Context _context;
074    
075    /** Service manager */
076    protected ServiceManager _manager;
077    
078    private ObserverExtensionPoint _observerExtPt;
079    private Collection<Observer> _registeredObservers = new ArrayList<>();
080    
081    @Override
082    public void service(ServiceManager manager) throws ServiceException
083    {
084        _manager = manager;
085        _observerExtPt  = (ObserverExtensionPoint) manager.lookup(ObserverExtensionPoint.ROLE);
086    }
087    
088    @Override
089    public void contextualize(org.apache.avalon.framework.context.Context context) throws ContextException
090    {
091        _context = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT);
092    }
093    
094    /**
095     * Notify of a event which will be dispatch to registered
096     * observers.
097     * @param event the event to notify.
098     * @return The {@link Future} objects of the asynchronous observers
099     */
100    public List<Future> notify(final Event event)
101    {
102        List<Future> result = new ArrayList<>();
103        
104        try
105        {
106            if (getLogger().isDebugEnabled())
107            {
108                getLogger().debug("Receiving " + event);
109            }
110            
111            if (__ALL_EVENTS_LOGGER.isInfoEnabled())
112            {
113                __ALL_EVENTS_LOGGER.info("Receiving " + event);
114            }
115            
116            List<Observer> supportedObservers = new ArrayList<>();
117            
118            // Retrieve supported observers
119            for (String observerId : _observerExtPt.getExtensionsIds())
120            {
121                Observer observer = _observerExtPt.getExtension(observerId);
122                
123                if (getLogger().isDebugEnabled())
124                {
125                    getLogger().debug("Checking support for event: " + event + " and observer: " + observer);
126                }
127                
128                if (observer.supports(event))
129                {
130                    if (getLogger().isDebugEnabled())
131                    {
132                        getLogger().debug("Event: " + event + " supported for observer: " + observer);
133                    }
134                    
135                    supportedObservers.add(observer);
136                }
137            }
138            
139            // Order observers (0 is first, Integer.MAX_INT is last)
140            Collections.sort(supportedObservers, new Comparator<Observer>()
141            {
142                @Override
143                public int compare(Observer o1, Observer o2)
144                {
145                    return new Integer(o1.getPriority(event)).compareTo(new Integer(o2.getPriority(event)));
146                }
147        
148            });
149            
150            // Observes the event and prepares the asynchronous observes.
151            _observesEvent(event, supportedObservers, result);
152        }
153        catch (Exception e)
154        {
155            // Observers should never fail, so just log the error
156            getLogger().error("Unable to dispatch event: " + event + " to observers", e);
157        }
158        
159        return result;
160    }
161    
162    /**
163     * Observes the event
164     * @param event The event
165     * @param supportedObservers list of observers
166     * @param result The result object to fill 
167     * @throws Exception on error
168     */
169    protected void _observesEvent(final Event event, List<Observer> supportedObservers, List<Future> result) throws Exception
170    {
171        List<AsyncObserver> parallelizableAsyncObservers = new ArrayList<>();
172        List<AsyncObserver> nonParallelizableAsyncObservers = new ArrayList<>();
173        
174        Map<String, Object> transientVars = new HashMap<>();
175        for (Observer supportedObserver : supportedObservers)
176        {
177            if (getLogger().isInfoEnabled())
178            {
179                getLogger().info("Notifying observer: " + supportedObserver + " for event: " + event);
180            }
181            
182            // Observe current event
183            if (supportedObserver instanceof AsyncObserver)
184            {
185                AsyncObserver asyncObserver = (AsyncObserver) supportedObserver;
186                boolean parallelizable = asyncObserver.parallelizable();
187                
188                if (getLogger().isDebugEnabled())
189                {
190                    getLogger().debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.",
191                            parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event));
192                }
193                if (__ALL_EVENTS_LOGGER.isDebugEnabled())
194                {
195                    __ALL_EVENTS_LOGGER.debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.",
196                            parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event));
197                }
198                
199                if (parallelizable)
200                {
201                    parallelizableAsyncObservers.add(asyncObserver);
202                }
203                else
204                {
205                    nonParallelizableAsyncObservers.add(asyncObserver);
206                }
207            }
208            else
209            {
210                supportedObserver.observe(event, transientVars);
211            }
212        }
213        
214        // Observe for async observer.
215        if (!parallelizableAsyncObservers.isEmpty() || !nonParallelizableAsyncObservers.isEmpty())
216        {
217            _asyncObserve(parallelizableAsyncObservers, nonParallelizableAsyncObservers, event, transientVars, result);
218        }
219    }
220
221    /**
222     * Async observe through a thread pool
223     * @param parallelObservers parallelizable asynchronous observers
224     * @param nonParallelObservers non parallelizable asynchronous observers
225     * @param event The event to observe
226     * @param transientVars The observer transient vars
227     * @param result The future results
228     */
229    private void _asyncObserve(Collection<AsyncObserver> parallelObservers, List<AsyncObserver> nonParallelObservers, Event event, Map<String, Object> transientVars, List<Future> result)
230    {
231        if (__SINGLE_THREAD_EXECUTOR == null)
232        {
233            AsyncObserveThreadFactory threadFactory = new AsyncObserveThreadFactory();
234            
235            __SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor(threadFactory);
236            
237            // 10 threads per core
238            __PARALLEL_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 10 * Runtime.getRuntime().availableProcessors(), 60L, TimeUnit.MILLISECONDS,
239                    new LinkedBlockingQueue<Runnable>(), threadFactory);
240        }
241        
242        if (!parallelObservers.isEmpty())
243        {
244            for (AsyncObserver observer :  parallelObservers)
245            {
246                Future future = __PARALLEL_THREAD_EXECUTOR.submit(new ParallelAsyncObserve(observer, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER));
247                result.add(future);
248            }
249        }
250        
251        if (!nonParallelObservers.isEmpty())
252        {
253            Future future = __SINGLE_THREAD_EXECUTOR.submit(new NonParallelAsyncObserve(nonParallelObservers, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER));
254            result.add(future);
255        }
256    }
257    
258    /**
259     * Runnable to be used for asynchronous calls 
260     */
261    abstract class AbstractAsyncObserve implements Callable<Object>
262    {
263        /** event to observe */
264        protected final Event _event;
265        protected final Map<String, Object> _transientVars;
266        protected final org.apache.avalon.framework.logger.Logger _logger;
267        protected final Logger _allEventLogger;
268        
269        AbstractAsyncObserve(Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
270        {
271            _event = event;
272            _transientVars = transientVars;
273            _logger = logger;
274            _allEventLogger = allEventLogger;
275        }
276        
277        @Override
278        public Object call()
279        {
280            Map<String, Object> environmentInformation = null;
281            Integer computedResult = 0;
282            try
283            {
284                // Create the environment.
285                environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_manager, _context, _logger);
286                
287                _observe();
288            }
289            catch (Exception e)
290            {
291                // Observer must never fail, so just log the error
292                _logger.error("Unable to dispatch event: " + _event + " to asynchronous observers", e);
293                computedResult = -1;
294            }
295            finally
296            {
297                // FIXME close jcr-sessions? (as in JCRSessionDispatchRequestProcess)
298                
299                // Leave the environment.
300                if (environmentInformation != null)
301                {
302                    BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation);
303                }
304            }
305            
306            return computedResult;
307        }
308        
309        /**
310         * Abstract observe method where the observation should be done
311         * @throws Exception on error
312         */
313        protected abstract void _observe() throws Exception;
314    }
315    
316    /**
317     * Runnable for parallel observers 
318     */
319    class ParallelAsyncObserve extends AbstractAsyncObserve
320    {
321        private final AsyncObserver _observer;
322        
323        ParallelAsyncObserve(AsyncObserver observer, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
324        {
325            super(event, transientVars, logger, allEventLogger);
326            _observer = observer;
327        }
328        
329        @Override
330        protected void _observe() throws Exception
331        {
332            if (_logger.isDebugEnabled())
333            {
334                _logger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + ".");
335            }
336            if (_allEventLogger.isDebugEnabled())
337            {
338                _allEventLogger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + ".");
339            }
340            
341            _observer.observe(_event, _transientVars);
342        }
343    }
344    
345    /**
346     * Runnable for non parallel observers 
347     */
348    class NonParallelAsyncObserve extends AbstractAsyncObserve
349    {
350        private final Collection<AsyncObserver> _observers;
351        
352        NonParallelAsyncObserve(Collection<AsyncObserver> observers, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger)
353        {
354            super(event, transientVars, logger, allEventLogger);
355            _observers = observers;
356        }
357        
358        @Override
359        protected void _observe() throws Exception
360        {
361            for (AsyncObserver observer : _observers)
362            {
363                if (_logger.isDebugEnabled())
364                {
365                    _logger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + ".");
366                }
367                if (_allEventLogger.isDebugEnabled())
368                {
369                    _allEventLogger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + ".");
370                }
371                
372                observer.observe(_event, _transientVars);
373            }
374        }
375    }
376    
377    /**
378     * Thread factory for async observers.
379     * Set the thread name format and marks the thread as daemon. 
380     */
381    static class AsyncObserveThreadFactory implements ThreadFactory
382    {
383        private static ThreadFactory _defaultThreadFactory;
384        private static String _nameFormat;
385        private static AtomicLong _count;
386        
387        public AsyncObserveThreadFactory()
388        {
389            _defaultThreadFactory = Executors.defaultThreadFactory();
390            _nameFormat = "ametys-async-observe-%d";
391            _count = new AtomicLong(0);
392        }
393        
394        public Thread newThread(Runnable r)
395        {
396            Thread thread = _defaultThreadFactory.newThread(r);
397            thread.setName(String.format(_nameFormat, _count.getAndIncrement()));
398            thread.setDaemon(true);
399            
400            return thread;
401        }
402    }
403    
404    @Override
405    public void dispose()
406    {
407        if (__SINGLE_THREAD_EXECUTOR != null)
408        {
409            __SINGLE_THREAD_EXECUTOR.shutdownNow();
410            __SINGLE_THREAD_EXECUTOR = null;
411        }
412        if (__PARALLEL_THREAD_EXECUTOR != null)
413        {
414            __PARALLEL_THREAD_EXECUTOR.shutdownNow();
415            __PARALLEL_THREAD_EXECUTOR = null;
416        }
417        
418        _context = null;
419        _manager = null;
420    }
421
422    /**
423     * Registers an {@link Observer}.
424     * @param observer the {@link Observer}.
425     */
426    public void registerObserver(Observer observer)
427    {
428        _registeredObservers.add(observer);
429    }
430    
431    /**
432     * Unregisters an {@link Observer}.
433     * @param observer the {@link Observer}.
434     */
435    public void unregisterObserver(Observer observer)
436    {
437        _registeredObservers.remove(observer);
438    }
439}