001/* 002 * Copyright 2013 Anyware Services 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.ametys.core.observation; 017 018import java.util.ArrayList; 019import java.util.Collection; 020import java.util.Collections; 021import java.util.Comparator; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.Callable; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.Future; 029import java.util.concurrent.LinkedBlockingQueue; 030import java.util.concurrent.ThreadFactory; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicLong; 034 035import org.apache.avalon.framework.activity.Disposable; 036import org.apache.avalon.framework.component.Component; 037import org.apache.avalon.framework.context.ContextException; 038import org.apache.avalon.framework.context.Contextualizable; 039import org.apache.avalon.framework.logger.AbstractLogEnabled; 040import org.apache.avalon.framework.service.ServiceException; 041import org.apache.avalon.framework.service.ServiceManager; 042import org.apache.avalon.framework.service.Serviceable; 043import org.apache.cocoon.Constants; 044import org.apache.cocoon.environment.Context; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.ametys.core.engine.BackgroundEngineHelper; 049 050/** 051 * Manager for dispatching {@link Event} instances to {@link Observer}s. 052 */ 053public class ObservationManager extends AbstractLogEnabled implements Component, Serviceable, Contextualizable, Disposable 054{ 055 /** Avalon ROLE. */ 056 public static final String ROLE = ObservationManager.class.getName(); 057 058 private static final Logger __ALL_EVENTS_LOGGER = LoggerFactory.getLogger("org.ametys.cms.observation.AllEvents"); 059 060 /** 061 * The executor service managing the single thread pool. This threads is 062 * used to run non-parallelizable observers. 063 */ 064 private static ExecutorService __SINGLE_THREAD_EXECUTOR; 065 066 /** 067 * The executor service managing the thread pool of asynchronous observers 068 * allowed to run in parallel. 069 */ 070 private static ExecutorService __PARALLEL_THREAD_EXECUTOR; 071 072 /** Cocoon context */ 073 protected Context _context; 074 075 /** Service manager */ 076 protected ServiceManager _manager; 077 078 private ObserverExtensionPoint _observerExtPt; 079 private Collection<Observer> _registeredObservers = new ArrayList<>(); 080 081 @Override 082 public void service(ServiceManager manager) throws ServiceException 083 { 084 _manager = manager; 085 _observerExtPt = (ObserverExtensionPoint) manager.lookup(ObserverExtensionPoint.ROLE); 086 } 087 088 @Override 089 public void contextualize(org.apache.avalon.framework.context.Context context) throws ContextException 090 { 091 _context = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT); 092 } 093 094 /** 095 * Notify of a event which will be dispatch to registered 096 * observers. 097 * @param event the event to notify. 098 * @return The {@link Future} objects of the asynchronous observers 099 */ 100 public List<Future> notify(final Event event) 101 { 102 List<Future> result = new ArrayList<>(); 103 104 try 105 { 106 if (getLogger().isDebugEnabled()) 107 { 108 getLogger().debug("Receiving " + event); 109 } 110 111 if (__ALL_EVENTS_LOGGER.isInfoEnabled()) 112 { 113 __ALL_EVENTS_LOGGER.info("Receiving " + event); 114 } 115 116 List<Observer> supportedObservers = new ArrayList<>(); 117 118 // Retrieve supported observers 119 for (String observerId : _observerExtPt.getExtensionsIds()) 120 { 121 Observer observer = _observerExtPt.getExtension(observerId); 122 123 if (getLogger().isDebugEnabled()) 124 { 125 getLogger().debug("Checking support for event: " + event + " and observer: " + observer); 126 } 127 128 if (observer.supports(event)) 129 { 130 if (getLogger().isDebugEnabled()) 131 { 132 getLogger().debug("Event: " + event + " supported for observer: " + observer); 133 } 134 135 supportedObservers.add(observer); 136 } 137 } 138 139 // Order observers (0 is first, Integer.MAX_INT is last) 140 Collections.sort(supportedObservers, new Comparator<Observer>() 141 { 142 @Override 143 public int compare(Observer o1, Observer o2) 144 { 145 return new Integer(o1.getPriority(event)).compareTo(new Integer(o2.getPriority(event))); 146 } 147 148 }); 149 150 // Observes the event and prepares the asynchronous observes. 151 _observesEvent(event, supportedObservers, result); 152 } 153 catch (Exception e) 154 { 155 // Observers should never fail, so just log the error 156 getLogger().error("Unable to dispatch event: " + event + " to observers", e); 157 } 158 159 return result; 160 } 161 162 /** 163 * Observes the event 164 * @param event The event 165 * @param supportedObservers list of observers 166 * @param result The result object to fill 167 * @throws Exception on error 168 */ 169 protected void _observesEvent(final Event event, List<Observer> supportedObservers, List<Future> result) throws Exception 170 { 171 List<AsyncObserver> parallelizableAsyncObservers = new ArrayList<>(); 172 List<AsyncObserver> nonParallelizableAsyncObservers = new ArrayList<>(); 173 174 Map<String, Object> transientVars = new HashMap<>(); 175 for (Observer supportedObserver : supportedObservers) 176 { 177 if (getLogger().isInfoEnabled()) 178 { 179 getLogger().info("Notifying observer: " + supportedObserver + " for event: " + event); 180 } 181 182 // Observe current event 183 if (supportedObserver instanceof AsyncObserver) 184 { 185 AsyncObserver asyncObserver = (AsyncObserver) supportedObserver; 186 boolean parallelizable = asyncObserver.parallelizable(); 187 188 if (getLogger().isDebugEnabled()) 189 { 190 getLogger().debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.", 191 parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event)); 192 } 193 if (__ALL_EVENTS_LOGGER.isDebugEnabled()) 194 { 195 __ALL_EVENTS_LOGGER.debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.", 196 parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event)); 197 } 198 199 if (parallelizable) 200 { 201 parallelizableAsyncObservers.add(asyncObserver); 202 } 203 else 204 { 205 nonParallelizableAsyncObservers.add(asyncObserver); 206 } 207 } 208 else 209 { 210 supportedObserver.observe(event, transientVars); 211 } 212 } 213 214 // Observe for async observer. 215 if (!parallelizableAsyncObservers.isEmpty() || !nonParallelizableAsyncObservers.isEmpty()) 216 { 217 _asyncObserve(parallelizableAsyncObservers, nonParallelizableAsyncObservers, event, transientVars, result); 218 } 219 } 220 221 /** 222 * Async observe through a thread pool 223 * @param parallelObservers parallelizable asynchronous observers 224 * @param nonParallelObservers non parallelizable asynchronous observers 225 * @param event The event to observe 226 * @param transientVars The observer transient vars 227 * @param result The future results 228 */ 229 private void _asyncObserve(Collection<AsyncObserver> parallelObservers, List<AsyncObserver> nonParallelObservers, Event event, Map<String, Object> transientVars, List<Future> result) 230 { 231 if (__SINGLE_THREAD_EXECUTOR == null) 232 { 233 AsyncObserveThreadFactory threadFactory = new AsyncObserveThreadFactory(); 234 235 __SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor(threadFactory); 236 237 // 10 threads per core 238 __PARALLEL_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 10 * Runtime.getRuntime().availableProcessors(), 60L, TimeUnit.MILLISECONDS, 239 new LinkedBlockingQueue<Runnable>(), threadFactory); 240 } 241 242 if (!parallelObservers.isEmpty()) 243 { 244 for (AsyncObserver observer : parallelObservers) 245 { 246 Future future = __PARALLEL_THREAD_EXECUTOR.submit(new ParallelAsyncObserve(observer, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER)); 247 result.add(future); 248 } 249 } 250 251 if (!nonParallelObservers.isEmpty()) 252 { 253 Future future = __SINGLE_THREAD_EXECUTOR.submit(new NonParallelAsyncObserve(nonParallelObservers, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER)); 254 result.add(future); 255 } 256 } 257 258 /** 259 * Runnable to be used for asynchronous calls 260 */ 261 abstract class AbstractAsyncObserve implements Callable<Object> 262 { 263 /** event to observe */ 264 protected final Event _event; 265 protected final Map<String, Object> _transientVars; 266 protected final org.apache.avalon.framework.logger.Logger _logger; 267 protected final Logger _allEventLogger; 268 269 AbstractAsyncObserve(Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 270 { 271 _event = event; 272 _transientVars = transientVars; 273 _logger = logger; 274 _allEventLogger = allEventLogger; 275 } 276 277 @Override 278 public Object call() 279 { 280 Map<String, Object> environmentInformation = null; 281 Integer computedResult = 0; 282 try 283 { 284 // Create the environment. 285 environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_manager, _context, _logger); 286 287 _observe(); 288 } 289 catch (Exception e) 290 { 291 // Observer must never fail, so just log the error 292 _logger.error("Unable to dispatch event: " + _event + " to asynchronous observers", e); 293 computedResult = -1; 294 } 295 finally 296 { 297 // FIXME close jcr-sessions? (as in JCRSessionDispatchRequestProcess) 298 299 // Leave the environment. 300 if (environmentInformation != null) 301 { 302 BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation); 303 } 304 } 305 306 return computedResult; 307 } 308 309 /** 310 * Abstract observe method where the observation should be done 311 * @throws Exception on error 312 */ 313 protected abstract void _observe() throws Exception; 314 } 315 316 /** 317 * Runnable for parallel observers 318 */ 319 class ParallelAsyncObserve extends AbstractAsyncObserve 320 { 321 private final AsyncObserver _observer; 322 323 ParallelAsyncObserve(AsyncObserver observer, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 324 { 325 super(event, transientVars, logger, allEventLogger); 326 _observer = observer; 327 } 328 329 @Override 330 protected void _observe() throws Exception 331 { 332 if (_logger.isDebugEnabled()) 333 { 334 _logger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + "."); 335 } 336 if (_allEventLogger.isDebugEnabled()) 337 { 338 _allEventLogger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + "."); 339 } 340 341 _observer.observe(_event, _transientVars); 342 } 343 } 344 345 /** 346 * Runnable for non parallel observers 347 */ 348 class NonParallelAsyncObserve extends AbstractAsyncObserve 349 { 350 private final Collection<AsyncObserver> _observers; 351 352 NonParallelAsyncObserve(Collection<AsyncObserver> observers, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 353 { 354 super(event, transientVars, logger, allEventLogger); 355 _observers = observers; 356 } 357 358 @Override 359 protected void _observe() throws Exception 360 { 361 for (AsyncObserver observer : _observers) 362 { 363 if (_logger.isDebugEnabled()) 364 { 365 _logger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + "."); 366 } 367 if (_allEventLogger.isDebugEnabled()) 368 { 369 _allEventLogger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + "."); 370 } 371 372 observer.observe(_event, _transientVars); 373 } 374 } 375 } 376 377 /** 378 * Thread factory for async observers. 379 * Set the thread name format and marks the thread as daemon. 380 */ 381 static class AsyncObserveThreadFactory implements ThreadFactory 382 { 383 private static ThreadFactory _defaultThreadFactory; 384 private static String _nameFormat; 385 private static AtomicLong _count; 386 387 public AsyncObserveThreadFactory() 388 { 389 _defaultThreadFactory = Executors.defaultThreadFactory(); 390 _nameFormat = "ametys-async-observe-%d"; 391 _count = new AtomicLong(0); 392 } 393 394 public Thread newThread(Runnable r) 395 { 396 Thread thread = _defaultThreadFactory.newThread(r); 397 thread.setName(String.format(_nameFormat, _count.getAndIncrement())); 398 thread.setDaemon(true); 399 400 return thread; 401 } 402 } 403 404 @Override 405 public void dispose() 406 { 407 if (__SINGLE_THREAD_EXECUTOR != null) 408 { 409 __SINGLE_THREAD_EXECUTOR.shutdownNow(); 410 __SINGLE_THREAD_EXECUTOR = null; 411 } 412 if (__PARALLEL_THREAD_EXECUTOR != null) 413 { 414 __PARALLEL_THREAD_EXECUTOR.shutdownNow(); 415 __PARALLEL_THREAD_EXECUTOR = null; 416 } 417 418 _context = null; 419 _manager = null; 420 } 421 422 /** 423 * Registers an {@link Observer}. 424 * @param observer the {@link Observer}. 425 */ 426 public void registerObserver(Observer observer) 427 { 428 _registeredObservers.add(observer); 429 } 430 431 /** 432 * Unregisters an {@link Observer}. 433 * @param observer the {@link Observer}. 434 */ 435 public void unregisterObserver(Observer observer) 436 { 437 _registeredObservers.remove(observer); 438 } 439}