001/* 002 * Copyright 2013 Anyware Services 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.ametys.core.observation; 017 018import java.util.ArrayList; 019import java.util.Collection; 020import java.util.Collections; 021import java.util.Comparator; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.Callable; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.Future; 029import java.util.concurrent.LinkedBlockingQueue; 030import java.util.concurrent.ThreadFactory; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicLong; 034 035import org.apache.avalon.framework.CascadingRuntimeException; 036import org.apache.avalon.framework.activity.Disposable; 037import org.apache.avalon.framework.component.Component; 038import org.apache.avalon.framework.context.ContextException; 039import org.apache.avalon.framework.context.Contextualizable; 040import org.apache.avalon.framework.logger.AbstractLogEnabled; 041import org.apache.avalon.framework.service.ServiceException; 042import org.apache.avalon.framework.service.ServiceManager; 043import org.apache.avalon.framework.service.Serviceable; 044import org.apache.cocoon.Constants; 045import org.apache.cocoon.components.ContextHelper; 046import org.apache.cocoon.environment.Context; 047import org.apache.cocoon.environment.Request; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.ametys.core.engine.BackgroundEngineHelper; 052 053/** 054 * Manager for dispatching {@link Event} instances to {@link Observer}s. 055 */ 056public class ObservationManager extends AbstractLogEnabled implements Component, Serviceable, Contextualizable, Disposable 057{ 058 /** Avalon ROLE. */ 059 public static final String ROLE = ObservationManager.class.getName(); 060 061 private static final String __CURRENT_FUTURES_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$currentFutures"; 062 private static final String __ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$.additional.args"; 063 private static final Logger __ALL_EVENTS_LOGGER = LoggerFactory.getLogger("org.ametys.cms.observation.AllEvents"); 064 065 /** 066 * The executor service managing the single thread pool. This threads is 067 * used to run non-parallelizable observers. 068 */ 069 private static ExecutorService __SINGLE_THREAD_EXECUTOR; 070 071 /** 072 * The executor service managing the thread pool of asynchronous observers 073 * allowed to run in parallel. 074 */ 075 private static ExecutorService __PARALLEL_THREAD_EXECUTOR; 076 077 /** Cocoon context */ 078 protected Context _context; 079 080 /** Service manager */ 081 protected ServiceManager _manager; 082 083 private org.apache.avalon.framework.context.Context _avalonContext; 084 private ObserverExtensionPoint _observerExtPt; 085 private Collection<Observer> _registeredObservers = new ArrayList<>(); 086 087 @Override 088 public void service(ServiceManager manager) throws ServiceException 089 { 090 _manager = manager; 091 _observerExtPt = (ObserverExtensionPoint) manager.lookup(ObserverExtensionPoint.ROLE); 092 } 093 094 @Override 095 public void contextualize(org.apache.avalon.framework.context.Context context) throws ContextException 096 { 097 _avalonContext = context; 098 _context = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT); 099 } 100 101 /** 102 * Notify of a event which will be dispatch to registered 103 * observers. 104 * @param event the event to notify. 105 * @return The {@link Future} objects of the asynchronous observers 106 */ 107 public List<Future> notify(final Event event) 108 { 109 List<Future> result = new ArrayList<>(); 110 111 try 112 { 113 if (getLogger().isDebugEnabled()) 114 { 115 getLogger().debug("Receiving " + event); 116 } 117 118 if (__ALL_EVENTS_LOGGER.isInfoEnabled()) 119 { 120 __ALL_EVENTS_LOGGER.info("Receiving " + event); 121 } 122 123 List<Observer> supportedObservers = new ArrayList<>(); 124 125 // Retrieve supported observers 126 for (String observerId : _observerExtPt.getExtensionsIds()) 127 { 128 Observer observer = _observerExtPt.getExtension(observerId); 129 130 if (getLogger().isDebugEnabled()) 131 { 132 getLogger().debug("Checking support for event: " + event + " and observer: " + observer); 133 } 134 135 if (observer.supports(event)) 136 { 137 if (getLogger().isDebugEnabled()) 138 { 139 getLogger().debug("Event: " + event + " supported for observer: " + observer); 140 } 141 142 supportedObservers.add(observer); 143 } 144 } 145 146 // Order observers (0 is first, Integer.MAX_INT is last) 147 Collections.sort(supportedObservers, new Comparator<Observer>() 148 { 149 @Override 150 public int compare(Observer o1, Observer o2) 151 { 152 return new Integer(o1.getPriority(event)).compareTo(new Integer(o2.getPriority(event))); 153 } 154 155 }); 156 157 // Observes the event and prepares the asynchronous observes. 158 _putAdditionalArgs(event); 159 _observesEvent(event, supportedObservers, result); 160 } 161 catch (Exception e) 162 { 163 // Observers should never fail, so just log the error 164 getLogger().error("Unable to dispatch event: " + event + " to observers", e); 165 } 166 167 // Keep in current request the futures of the curent request 168 try 169 { 170 List<Future> futuresForCurrentRequest = _getFuturesForRequest(); 171 futuresForCurrentRequest.addAll(result); 172 } 173 catch (CascadingRuntimeException e) 174 { 175 // can happen if no request 176 // do nothing though 177 } 178 179 return result; 180 } 181 182 /** 183 * Gets all the {@link Future}s of all the {@link AsyncObserver}s notified by the call to {@link ObservationManager#notify(Event)} during the current request. 184 * <br>This can be useful to call {@link Future#get()} in order to ensure all {@link AsyncObserver}s notified by the current request have finished to work. 185 * @return the {@link Future}s of the notified observers during the current request. 186 */ 187 public List<Future> getFuturesForRequest() 188 { 189 try 190 { 191 return Collections.unmodifiableList(_getFuturesForRequest()); 192 } 193 catch (CascadingRuntimeException e) 194 { 195 // can happen if no request 196 // return empty list 197 return Collections.emptyList(); 198 } 199 } 200 201 private List<Future> _getFuturesForRequest() throws CascadingRuntimeException 202 { 203 Request request = ContextHelper.getRequest(_avalonContext); 204 @SuppressWarnings("unchecked") 205 List<Future> futuresForCurrentRequest = (List<Future>) request.getAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME); 206 if (futuresForCurrentRequest == null) 207 { 208 futuresForCurrentRequest = new ArrayList<>(); 209 request.setAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME, futuresForCurrentRequest); 210 } 211 return futuresForCurrentRequest; 212 } 213 214 /** 215 * For all events of the given event id which will be notified during the current request, add an additional argument 216 * @param eventIds The event ids 217 * @param argName The name of the additional argument 218 * @param argValue The value of the additional argument 219 */ 220 public void addArgumentForEvents(String[] eventIds, String argName, Object argValue) 221 { 222 Request request = ContextHelper.getRequest(_avalonContext); 223 224 // Map eventId -> argName -> argValue 225 @SuppressWarnings("unchecked") 226 Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME); 227 if (additionalArgsForEvents == null) 228 { 229 additionalArgsForEvents = new HashMap<>(); 230 request.setAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME, additionalArgsForEvents); 231 } 232 233 for (String eventId : eventIds) 234 { 235 Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId); 236 if (additionalArgs == null) 237 { 238 additionalArgs = new HashMap<>(); 239 additionalArgsForEvents.put(eventId, additionalArgs); 240 } 241 242 additionalArgs.put(argName, argValue); 243 } 244 } 245 246 /** 247 * For all events of the given event id which will be notified during the current request, removes the additional argument 248 * with the given name which was added with the {@link ObservationManager#addArgumentForEvents(String[], String, Object)} method. 249 * @param eventIds The event ids 250 * @param argName The name of the additional argument to remove 251 */ 252 public void removeArgumentForEvents(String[] eventIds, String argName) 253 { 254 Request request = ContextHelper.getRequest(_avalonContext); 255 256 // Map eventId -> argName -> argValue 257 @SuppressWarnings("unchecked") 258 Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME); 259 if (additionalArgsForEvents == null) 260 { 261 return; 262 } 263 264 for (String eventId : eventIds) 265 { 266 Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId); 267 if (additionalArgs == null) 268 { 269 break; 270 } 271 272 additionalArgs.remove(argName); 273 } 274 } 275 276 private void _putAdditionalArgs(final Event event) 277 { 278 String eventId = event.getId(); 279 280 Request request = null; 281 try 282 { 283 request = ContextHelper.getRequest(_avalonContext); 284 } 285 catch (CascadingRuntimeException e) 286 { 287 // can happen if no request 288 // return empty list 289 } 290 if (request != null) 291 { 292 @SuppressWarnings("unchecked") 293 Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME); 294 if (additionalArgsForEvents != null) 295 { 296 Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId); 297 if (additionalArgs != null) 298 { 299 event.getArguments().putAll(additionalArgs); 300 } 301 } 302 } 303 } 304 305 /** 306 * Observes the event 307 * @param event The event 308 * @param supportedObservers list of observers 309 * @param result The result object to fill 310 * @throws Exception on error 311 */ 312 protected void _observesEvent(final Event event, List<Observer> supportedObservers, List<Future> result) throws Exception 313 { 314 List<AsyncObserver> parallelizableAsyncObservers = new ArrayList<>(); 315 List<AsyncObserver> nonParallelizableAsyncObservers = new ArrayList<>(); 316 317 Map<String, Object> transientVars = new HashMap<>(); 318 for (Observer supportedObserver : supportedObservers) 319 { 320 if (getLogger().isInfoEnabled()) 321 { 322 getLogger().info("Notifying observer: " + supportedObserver + " for event: " + event); 323 } 324 325 // Observe current event 326 if (supportedObserver instanceof AsyncObserver) 327 { 328 AsyncObserver asyncObserver = (AsyncObserver) supportedObserver; 329 boolean parallelizable = asyncObserver.parallelizable(); 330 331 if (getLogger().isDebugEnabled()) 332 { 333 getLogger().debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.", 334 parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event)); 335 } 336 if (__ALL_EVENTS_LOGGER.isDebugEnabled()) 337 { 338 __ALL_EVENTS_LOGGER.debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.", 339 parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event)); 340 } 341 342 if (parallelizable) 343 { 344 parallelizableAsyncObservers.add(asyncObserver); 345 } 346 else 347 { 348 nonParallelizableAsyncObservers.add(asyncObserver); 349 } 350 } 351 else 352 { 353 try 354 { 355 supportedObserver.observe(event, transientVars); 356 } 357 catch (Exception e) 358 { 359 // Observation process should continue 360 getLogger().error("The synchronous observer '" + supportedObserver.getClass().getName() + "' failed to observe " + event, e); 361 } 362 } 363 } 364 365 // Observe for async observer. 366 if (!parallelizableAsyncObservers.isEmpty() || !nonParallelizableAsyncObservers.isEmpty()) 367 { 368 _asyncObserve(parallelizableAsyncObservers, nonParallelizableAsyncObservers, event, transientVars, result); 369 } 370 } 371 372 /** 373 * Async observe through a thread pool 374 * @param parallelObservers parallelizable asynchronous observers 375 * @param nonParallelObservers non parallelizable asynchronous observers 376 * @param event The event to observe 377 * @param transientVars The observer transient vars 378 * @param result The future results 379 */ 380 private void _asyncObserve(Collection<AsyncObserver> parallelObservers, List<AsyncObserver> nonParallelObservers, Event event, Map<String, Object> transientVars, List<Future> result) 381 { 382 if (__SINGLE_THREAD_EXECUTOR == null) 383 { 384 AsyncObserveThreadFactory threadFactory = new AsyncObserveThreadFactory(); 385 386 __SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor(threadFactory); 387 388 // 10 threads per core 389 __PARALLEL_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 10 * Runtime.getRuntime().availableProcessors(), 60L, TimeUnit.MILLISECONDS, 390 new LinkedBlockingQueue<Runnable>(), threadFactory); 391 } 392 393 if (!parallelObservers.isEmpty()) 394 { 395 for (AsyncObserver observer : parallelObservers) 396 { 397 Future future = __PARALLEL_THREAD_EXECUTOR.submit(new ParallelAsyncObserve(observer, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER)); 398 result.add(future); 399 } 400 } 401 402 if (!nonParallelObservers.isEmpty()) 403 { 404 Future future = __SINGLE_THREAD_EXECUTOR.submit(new NonParallelAsyncObserve(nonParallelObservers, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER)); 405 result.add(future); 406 } 407 } 408 409 /** 410 * Runnable to be used for asynchronous calls 411 */ 412 abstract class AbstractAsyncObserve implements Callable<Object> 413 { 414 /** event to observe */ 415 protected final Event _event; 416 protected final Map<String, Object> _transientVars; 417 protected final org.apache.avalon.framework.logger.Logger _logger; 418 protected final Logger _allEventLogger; 419 420 AbstractAsyncObserve(Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 421 { 422 _event = event; 423 _transientVars = transientVars; 424 _logger = logger; 425 _allEventLogger = allEventLogger; 426 } 427 428 @Override 429 public Object call() 430 { 431 Map<String, Object> environmentInformation = null; 432 Integer computedResult = 0; 433 try 434 { 435 // Create the environment. 436 environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_manager, _context, _logger); 437 438 _observe(); 439 } 440 catch (Exception e) 441 { 442 // Observer must never fail, so just log the error 443 _logger.error("Unable to dispatch event: " + _event + " to asynchronous observers", e); 444 computedResult = -1; 445 } 446 finally 447 { 448 // FIXME close jcr-sessions? (as in JCRSessionDispatchRequestProcess) 449 450 // Leave the environment. 451 if (environmentInformation != null) 452 { 453 BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation); 454 } 455 } 456 457 return computedResult; 458 } 459 460 /** 461 * Abstract observe method where the observation should be done 462 * @throws Exception on error 463 */ 464 protected abstract void _observe() throws Exception; 465 } 466 467 /** 468 * Runnable for parallel observers 469 */ 470 class ParallelAsyncObserve extends AbstractAsyncObserve 471 { 472 private final AsyncObserver _observer; 473 474 ParallelAsyncObserve(AsyncObserver observer, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 475 { 476 super(event, transientVars, logger, allEventLogger); 477 _observer = observer; 478 } 479 480 @Override 481 protected void _observe() throws Exception 482 { 483 if (_logger.isDebugEnabled()) 484 { 485 _logger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + "."); 486 } 487 if (_allEventLogger.isDebugEnabled()) 488 { 489 _allEventLogger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + "."); 490 } 491 492 _observer.observe(_event, _transientVars); 493 } 494 } 495 496 /** 497 * Runnable for non parallel observers 498 */ 499 class NonParallelAsyncObserve extends AbstractAsyncObserve 500 { 501 private final Collection<AsyncObserver> _observers; 502 503 NonParallelAsyncObserve(Collection<AsyncObserver> observers, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 504 { 505 super(event, transientVars, logger, allEventLogger); 506 _observers = observers; 507 } 508 509 @Override 510 protected void _observe() throws Exception 511 { 512 for (AsyncObserver observer : _observers) 513 { 514 if (_logger.isDebugEnabled()) 515 { 516 _logger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + "."); 517 } 518 if (_allEventLogger.isDebugEnabled()) 519 { 520 _allEventLogger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + "."); 521 } 522 523 observer.observe(_event, _transientVars); 524 } 525 } 526 } 527 528 /** 529 * Thread factory for async observers. 530 * Set the thread name format and marks the thread as daemon. 531 */ 532 static class AsyncObserveThreadFactory implements ThreadFactory 533 { 534 private static ThreadFactory _defaultThreadFactory; 535 private static String _nameFormat; 536 private static AtomicLong _count; 537 538 public AsyncObserveThreadFactory() 539 { 540 _defaultThreadFactory = Executors.defaultThreadFactory(); 541 _nameFormat = "ametys-async-observe-%d"; 542 _count = new AtomicLong(0); 543 } 544 545 public Thread newThread(Runnable r) 546 { 547 Thread thread = _defaultThreadFactory.newThread(r); 548 thread.setName(String.format(_nameFormat, _count.getAndIncrement())); 549 thread.setDaemon(true); 550 551 return thread; 552 } 553 } 554 555 @Override 556 public void dispose() 557 { 558 if (__SINGLE_THREAD_EXECUTOR != null) 559 { 560 __SINGLE_THREAD_EXECUTOR.shutdownNow(); 561 __SINGLE_THREAD_EXECUTOR = null; 562 } 563 if (__PARALLEL_THREAD_EXECUTOR != null) 564 { 565 __PARALLEL_THREAD_EXECUTOR.shutdownNow(); 566 __PARALLEL_THREAD_EXECUTOR = null; 567 } 568 569 _context = null; 570 _manager = null; 571 } 572 573 /** 574 * Registers an {@link Observer}. 575 * @param observer the {@link Observer}. 576 */ 577 public void registerObserver(Observer observer) 578 { 579 _registeredObservers.add(observer); 580 } 581 582 /** 583 * Unregisters an {@link Observer}. 584 * @param observer the {@link Observer}. 585 */ 586 public void unregisterObserver(Observer observer) 587 { 588 _registeredObservers.remove(observer); 589 } 590}