001/* 002 * Copyright 2013 Anyware Services 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.ametys.core.observation; 017 018import java.util.ArrayList; 019import java.util.Collection; 020import java.util.Collections; 021import java.util.Comparator; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.Callable; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.Future; 029import java.util.concurrent.LinkedBlockingQueue; 030import java.util.concurrent.ThreadFactory; 031import java.util.concurrent.ThreadPoolExecutor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.atomic.AtomicLong; 034 035import org.apache.avalon.framework.CascadingRuntimeException; 036import org.apache.avalon.framework.activity.Disposable; 037import org.apache.avalon.framework.component.Component; 038import org.apache.avalon.framework.context.ContextException; 039import org.apache.avalon.framework.context.Contextualizable; 040import org.apache.avalon.framework.logger.AbstractLogEnabled; 041import org.apache.avalon.framework.service.ServiceException; 042import org.apache.avalon.framework.service.ServiceManager; 043import org.apache.avalon.framework.service.Serviceable; 044import org.apache.cocoon.Constants; 045import org.apache.cocoon.components.ContextHelper; 046import org.apache.cocoon.environment.Context; 047import org.apache.cocoon.environment.Request; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051import org.ametys.core.engine.BackgroundEngineHelper; 052 053/** 054 * Manager for dispatching {@link Event} instances to {@link Observer}s. 055 */ 056public class ObservationManager extends AbstractLogEnabled implements Component, Serviceable, Contextualizable, Disposable 057{ 058 /** Avalon ROLE. */ 059 public static final String ROLE = ObservationManager.class.getName(); 060 061 private static final String __CURRENT_FUTURES_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$currentFutures"; 062 private static final String __ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$.additional.args"; 063 private static final Logger __ALL_EVENTS_LOGGER = LoggerFactory.getLogger("org.ametys.cms.observation.AllEvents"); 064 065 /** 066 * The executor service managing the single thread pool. This threads is 067 * used to run non-parallelizable observers. 068 */ 069 private static ExecutorService __SINGLE_THREAD_EXECUTOR; 070 071 /** 072 * The executor service managing the thread pool of asynchronous observers 073 * allowed to run in parallel. 074 */ 075 private static ExecutorService __PARALLEL_THREAD_EXECUTOR; 076 077 /** Cocoon context */ 078 protected Context _context; 079 080 /** Service manager */ 081 protected ServiceManager _manager; 082 083 private org.apache.avalon.framework.context.Context _avalonContext; 084 private ObserverExtensionPoint _observerExtPt; 085 private Collection<Observer> _registeredObservers = new ArrayList<>(); 086 087 @Override 088 public void service(ServiceManager manager) throws ServiceException 089 { 090 _manager = manager; 091 _observerExtPt = (ObserverExtensionPoint) manager.lookup(ObserverExtensionPoint.ROLE); 092 } 093 094 @Override 095 public void contextualize(org.apache.avalon.framework.context.Context context) throws ContextException 096 { 097 _avalonContext = context; 098 _context = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT); 099 } 100 101 /** 102 * Notify of a event which will be dispatch to registered 103 * observers. 104 * @param event the event to notify. 105 * @return The {@link Future} objects of the asynchronous observers 106 */ 107 public List<Future> notify(final Event event) 108 { 109 List<Future> result = new ArrayList<>(); 110 111 try 112 { 113 if (getLogger().isDebugEnabled()) 114 { 115 getLogger().debug("Receiving " + event); 116 } 117 118 if (__ALL_EVENTS_LOGGER.isInfoEnabled()) 119 { 120 __ALL_EVENTS_LOGGER.info("Receiving " + event); 121 } 122 123 List<Observer> supportedObservers = new ArrayList<>(); 124 125 // Retrieve supported observers from extension point 126 for (String observerId : _observerExtPt.getExtensionsIds()) 127 { 128 Observer observer = _observerExtPt.getExtension(observerId); 129 _addSupportedObserver(supportedObservers, observer, event); 130 } 131 132 // Retrieve supported observers from manual registration 133 for (Observer observer : _registeredObservers) 134 { 135 _addSupportedObserver(supportedObservers, observer, event); 136 } 137 138 // Order observers (0 is first, Integer.MAX_INT is last) 139 Collections.sort(supportedObservers, Comparator.comparingInt( 140 observer -> observer.getPriority(event) 141 )); 142 143 // Observes the event and prepares the asynchronous observes. 144 _putAdditionalArgs(event); 145 _observesEvent(event, supportedObservers, result); 146 } 147 catch (Exception e) 148 { 149 // Observers should never fail, so just log the error 150 getLogger().error("Unable to dispatch event: " + event + " to observers", e); 151 } 152 153 // Keep in current request the futures of the curent request 154 try 155 { 156 List<Future> futuresForCurrentRequest = _getFuturesForRequest(); 157 futuresForCurrentRequest.addAll(result); 158 } 159 catch (CascadingRuntimeException e) 160 { 161 // can happen if no request 162 // do nothing though 163 } 164 165 return result; 166 } 167 168 private void _addSupportedObserver(List<Observer> supportedObservers, Observer observer, final Event event) 169 { 170 if (getLogger().isDebugEnabled()) 171 { 172 getLogger().debug("Checking support for event: " + event + " and observer: " + observer); 173 } 174 175 if (observer.supports(event)) 176 { 177 if (getLogger().isDebugEnabled()) 178 { 179 getLogger().debug("Event: " + event + " supported for observer: " + observer); 180 } 181 182 supportedObservers.add(observer); 183 } 184 } 185 186 /** 187 * Gets all the {@link Future}s of all the {@link AsyncObserver}s notified by the call to {@link ObservationManager#notify(Event)} during the current request. 188 * <br>This can be useful to call {@link Future#get()} in order to ensure all {@link AsyncObserver}s notified by the current request have finished to work. 189 * @return the {@link Future}s of the notified observers during the current request. 190 */ 191 public List<Future> getFuturesForRequest() 192 { 193 try 194 { 195 return Collections.unmodifiableList(_getFuturesForRequest()); 196 } 197 catch (CascadingRuntimeException e) 198 { 199 // can happen if no request 200 // return empty list 201 return Collections.emptyList(); 202 } 203 } 204 205 private List<Future> _getFuturesForRequest() throws CascadingRuntimeException 206 { 207 Request request = ContextHelper.getRequest(_avalonContext); 208 @SuppressWarnings("unchecked") 209 List<Future> futuresForCurrentRequest = (List<Future>) request.getAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME); 210 if (futuresForCurrentRequest == null) 211 { 212 futuresForCurrentRequest = new ArrayList<>(); 213 request.setAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME, futuresForCurrentRequest); 214 } 215 return futuresForCurrentRequest; 216 } 217 218 /** 219 * For all events of the given event id which will be notified during the current request, add an additional argument 220 * @param eventIds The event ids 221 * @param argName The name of the additional argument 222 * @param argValue The value of the additional argument 223 */ 224 public void addArgumentForEvents(String[] eventIds, String argName, Object argValue) 225 { 226 Request request = ContextHelper.getRequest(_avalonContext); 227 228 // Map eventId -> argName -> argValue 229 @SuppressWarnings("unchecked") 230 Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME); 231 if (additionalArgsForEvents == null) 232 { 233 additionalArgsForEvents = new HashMap<>(); 234 request.setAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME, additionalArgsForEvents); 235 } 236 237 for (String eventId : eventIds) 238 { 239 Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId); 240 if (additionalArgs == null) 241 { 242 additionalArgs = new HashMap<>(); 243 additionalArgsForEvents.put(eventId, additionalArgs); 244 } 245 246 additionalArgs.put(argName, argValue); 247 } 248 } 249 250 /** 251 * For all events of the given event id which will be notified during the current request, removes the additional argument 252 * with the given name which was added with the {@link ObservationManager#addArgumentForEvents(String[], String, Object)} method. 253 * @param eventIds The event ids 254 * @param argName The name of the additional argument to remove 255 */ 256 public void removeArgumentForEvents(String[] eventIds, String argName) 257 { 258 Request request = ContextHelper.getRequest(_avalonContext); 259 260 // Map eventId -> argName -> argValue 261 @SuppressWarnings("unchecked") 262 Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME); 263 if (additionalArgsForEvents == null) 264 { 265 return; 266 } 267 268 for (String eventId : eventIds) 269 { 270 Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId); 271 if (additionalArgs == null) 272 { 273 break; 274 } 275 276 additionalArgs.remove(argName); 277 } 278 } 279 280 private void _putAdditionalArgs(final Event event) 281 { 282 String eventId = event.getId(); 283 284 Request request = null; 285 try 286 { 287 request = ContextHelper.getRequest(_avalonContext); 288 } 289 catch (CascadingRuntimeException e) 290 { 291 // can happen if no request 292 // return empty list 293 } 294 if (request != null) 295 { 296 @SuppressWarnings("unchecked") 297 Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME); 298 if (additionalArgsForEvents != null) 299 { 300 Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId); 301 if (additionalArgs != null) 302 { 303 event.getArguments().putAll(additionalArgs); 304 } 305 } 306 } 307 } 308 309 /** 310 * Observes the event 311 * @param event The event 312 * @param supportedObservers list of observers 313 * @param result The result object to fill 314 * @throws Exception on error 315 */ 316 protected void _observesEvent(final Event event, List<Observer> supportedObservers, List<Future> result) throws Exception 317 { 318 List<AsyncObserver> parallelizableAsyncObservers = new ArrayList<>(); 319 List<AsyncObserver> nonParallelizableAsyncObservers = new ArrayList<>(); 320 321 Map<String, Object> transientVars = new HashMap<>(); 322 for (Observer supportedObserver : supportedObservers) 323 { 324 if (getLogger().isInfoEnabled()) 325 { 326 getLogger().info("Notifying observer: " + supportedObserver + " for event: " + event); 327 } 328 329 // Observe current event 330 if (supportedObserver instanceof AsyncObserver) 331 { 332 AsyncObserver asyncObserver = (AsyncObserver) supportedObserver; 333 boolean parallelizable = asyncObserver.parallelizable(); 334 335 if (getLogger().isDebugEnabled()) 336 { 337 getLogger().debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.", 338 parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event)); 339 } 340 if (__ALL_EVENTS_LOGGER.isDebugEnabled()) 341 { 342 __ALL_EVENTS_LOGGER.debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.", 343 parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event)); 344 } 345 346 if (parallelizable) 347 { 348 parallelizableAsyncObservers.add(asyncObserver); 349 } 350 else 351 { 352 nonParallelizableAsyncObservers.add(asyncObserver); 353 } 354 } 355 else 356 { 357 try 358 { 359 supportedObserver.observe(event, transientVars); 360 } 361 catch (Throwable e) 362 { 363 // Observation process should continue 364 getLogger().error("The synchronous observer '" + supportedObserver.getClass().getName() + "' failed to observe " + event, e); 365 } 366 } 367 } 368 369 // Observe for async observer. 370 if (!parallelizableAsyncObservers.isEmpty() || !nonParallelizableAsyncObservers.isEmpty()) 371 { 372 _asyncObserve(parallelizableAsyncObservers, nonParallelizableAsyncObservers, event, transientVars, result); 373 } 374 } 375 376 /** 377 * Async observe through a thread pool 378 * @param parallelObservers parallelizable asynchronous observers 379 * @param nonParallelObservers non parallelizable asynchronous observers 380 * @param event The event to observe 381 * @param transientVars The observer transient vars 382 * @param result The future results 383 */ 384 private void _asyncObserve(Collection<AsyncObserver> parallelObservers, List<AsyncObserver> nonParallelObservers, Event event, Map<String, Object> transientVars, List<Future> result) 385 { 386 if (__SINGLE_THREAD_EXECUTOR == null) 387 { 388 AsyncObserveThreadFactory threadFactory = new AsyncObserveThreadFactory(); 389 390 __SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor(threadFactory); 391 392 // 10 threads per core 393 __PARALLEL_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 10 * Runtime.getRuntime().availableProcessors(), 60L, TimeUnit.MILLISECONDS, 394 new LinkedBlockingQueue<Runnable>(), threadFactory); 395 } 396 397 if (!parallelObservers.isEmpty()) 398 { 399 for (AsyncObserver observer : parallelObservers) 400 { 401 Future future = __PARALLEL_THREAD_EXECUTOR.submit(new ParallelAsyncObserve(observer, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER)); 402 result.add(future); 403 } 404 } 405 406 if (!nonParallelObservers.isEmpty()) 407 { 408 Future future = __SINGLE_THREAD_EXECUTOR.submit(new NonParallelAsyncObserve(nonParallelObservers, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER)); 409 result.add(future); 410 } 411 } 412 413 /** 414 * Runnable to be used for asynchronous calls 415 */ 416 abstract class AbstractAsyncObserve implements Callable<Object> 417 { 418 /** event to observe */ 419 protected final Event _event; 420 protected final Map<String, Object> _transientVars; 421 protected final org.apache.avalon.framework.logger.Logger _logger; 422 protected final Logger _allEventLogger; 423 424 AbstractAsyncObserve(Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 425 { 426 _event = event; 427 _transientVars = transientVars; 428 _logger = logger; 429 _allEventLogger = allEventLogger; 430 } 431 432 @Override 433 public Object call() 434 { 435 Map<String, Object> environmentInformation = null; 436 Integer computedResult = 0; 437 try 438 { 439 // Create the environment. 440 environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_manager, _context, _logger); 441 442 _observe(); 443 } 444 catch (Exception e) 445 { 446 // Observer must never fail, so just log the error 447 _logger.error("Unable to dispatch event: " + _event + " to asynchronous observers", e); 448 computedResult = -1; 449 } 450 finally 451 { 452 // FIXME close jcr-sessions? (as in JCRSessionDispatchRequestProcess) 453 454 // Leave the environment. 455 if (environmentInformation != null) 456 { 457 BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation); 458 } 459 } 460 461 return computedResult; 462 } 463 464 /** 465 * Abstract observe method where the observation should be done 466 * @throws Exception on error 467 */ 468 protected abstract void _observe() throws Exception; 469 } 470 471 /** 472 * Runnable for parallel observers 473 */ 474 class ParallelAsyncObserve extends AbstractAsyncObserve 475 { 476 private final AsyncObserver _observer; 477 478 ParallelAsyncObserve(AsyncObserver observer, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 479 { 480 super(event, transientVars, logger, allEventLogger); 481 _observer = observer; 482 } 483 484 @Override 485 protected void _observe() throws Exception 486 { 487 if (_logger.isDebugEnabled()) 488 { 489 _logger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + "."); 490 } 491 if (_allEventLogger.isDebugEnabled()) 492 { 493 _allEventLogger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + "."); 494 } 495 496 _observer.observe(_event, _transientVars); 497 } 498 } 499 500 /** 501 * Runnable for non parallel observers 502 */ 503 class NonParallelAsyncObserve extends AbstractAsyncObserve 504 { 505 private final Collection<AsyncObserver> _observers; 506 507 NonParallelAsyncObserve(Collection<AsyncObserver> observers, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 508 { 509 super(event, transientVars, logger, allEventLogger); 510 _observers = observers; 511 } 512 513 @Override 514 protected void _observe() throws Exception 515 { 516 for (AsyncObserver observer : _observers) 517 { 518 if (_logger.isDebugEnabled()) 519 { 520 _logger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + "."); 521 } 522 if (_allEventLogger.isDebugEnabled()) 523 { 524 _allEventLogger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + "."); 525 } 526 527 observer.observe(_event, _transientVars); 528 } 529 } 530 } 531 532 /** 533 * Thread factory for async observers. 534 * Set the thread name format and marks the thread as daemon. 535 */ 536 static class AsyncObserveThreadFactory implements ThreadFactory 537 { 538 private static ThreadFactory _defaultThreadFactory; 539 private static String _nameFormat; 540 private static AtomicLong _count; 541 542 public AsyncObserveThreadFactory() 543 { 544 _defaultThreadFactory = Executors.defaultThreadFactory(); 545 _nameFormat = "ametys-async-observe-%d"; 546 _count = new AtomicLong(0); 547 } 548 549 public Thread newThread(Runnable r) 550 { 551 Thread thread = _defaultThreadFactory.newThread(r); 552 thread.setName(String.format(_nameFormat, _count.getAndIncrement())); 553 thread.setDaemon(true); 554 555 return thread; 556 } 557 } 558 559 @Override 560 public void dispose() 561 { 562 if (__SINGLE_THREAD_EXECUTOR != null) 563 { 564 __SINGLE_THREAD_EXECUTOR.shutdownNow(); 565 __SINGLE_THREAD_EXECUTOR = null; 566 } 567 if (__PARALLEL_THREAD_EXECUTOR != null) 568 { 569 __PARALLEL_THREAD_EXECUTOR.shutdownNow(); 570 __PARALLEL_THREAD_EXECUTOR = null; 571 } 572 573 _context = null; 574 _manager = null; 575 } 576 577 /** 578 * Registers an {@link Observer}. 579 * @param observer the {@link Observer}. 580 */ 581 public void registerObserver(Observer observer) 582 { 583 _registeredObservers.add(observer); 584 } 585 586 /** 587 * Unregisters an {@link Observer}. 588 * @param observer the {@link Observer}. 589 */ 590 public void unregisterObserver(Observer observer) 591 { 592 _registeredObservers.remove(observer); 593 } 594}