001/* 002 * Copyright 2013 Anyware Services 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.ametys.core.observation; 017 018import java.util.ArrayList; 019import java.util.Collection; 020import java.util.Collections; 021import java.util.Comparator; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.Callable; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.Executors; 028import java.util.concurrent.Future; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.atomic.AtomicLong; 031 032import org.apache.avalon.framework.CascadingRuntimeException; 033import org.apache.avalon.framework.activity.Disposable; 034import org.apache.avalon.framework.component.Component; 035import org.apache.avalon.framework.context.ContextException; 036import org.apache.avalon.framework.context.Contextualizable; 037import org.apache.avalon.framework.logger.AbstractLogEnabled; 038import org.apache.avalon.framework.service.ServiceException; 039import org.apache.avalon.framework.service.ServiceManager; 040import org.apache.avalon.framework.service.Serviceable; 041import org.apache.cocoon.Constants; 042import org.apache.cocoon.components.ContextHelper; 043import org.apache.cocoon.environment.Context; 044import org.apache.cocoon.environment.Request; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.ametys.core.engine.BackgroundEngineHelper; 049 050/** 051 * Manager for dispatching {@link Event} instances to {@link Observer}s. 052 */ 053public class ObservationManager extends AbstractLogEnabled implements Component, Serviceable, Contextualizable, Disposable 054{ 055 /** Avalon ROLE. */ 056 public static final String ROLE = ObservationManager.class.getName(); 057 058 private static final String __CURRENT_FUTURES_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$currentFutures"; 059 private static final String __ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$.additional.args"; 060 private static final Logger __ALL_EVENTS_LOGGER = LoggerFactory.getLogger("org.ametys.cms.observation.AllEvents"); 061 062 /** 063 * The executor service managing the single thread pool. This threads is 064 * used to run non-parallelizable observers. 065 */ 066 private static ExecutorService __SINGLE_THREAD_EXECUTOR; 067 068 /** 069 * The executor service managing the thread pool of asynchronous observers 070 * allowed to run in parallel. 071 */ 072 private static ExecutorService __PARALLEL_THREAD_EXECUTOR; 073 074 /** Cocoon context */ 075 protected Context _context; 076 077 /** Service manager */ 078 protected ServiceManager _manager; 079 080 private org.apache.avalon.framework.context.Context _avalonContext; 081 private ObserverExtensionPoint _observerExtPt; 082 private Collection<Observer> _registeredObservers = new ArrayList<>(); 083 084 @Override 085 public void service(ServiceManager manager) throws ServiceException 086 { 087 _manager = manager; 088 _observerExtPt = (ObserverExtensionPoint) manager.lookup(ObserverExtensionPoint.ROLE); 089 } 090 091 @Override 092 public void contextualize(org.apache.avalon.framework.context.Context context) throws ContextException 093 { 094 _avalonContext = context; 095 _context = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT); 096 } 097 098 /** 099 * Notify of a event which will be dispatch to registered 100 * observers. 101 * @param event the event to notify. 102 * @return The {@link Future} objects of the asynchronous observers 103 */ 104 public List<Future> notify(final Event event) 105 { 106 List<Future> result = new ArrayList<>(); 107 108 try 109 { 110 if (getLogger().isDebugEnabled()) 111 { 112 getLogger().debug("Receiving " + event); 113 } 114 115 if (__ALL_EVENTS_LOGGER.isInfoEnabled()) 116 { 117 __ALL_EVENTS_LOGGER.info("Receiving " + event); 118 } 119 120 List<Observer> supportedObservers = new ArrayList<>(); 121 122 // Retrieve supported observers from extension point 123 for (String observerId : _observerExtPt.getExtensionsIds()) 124 { 125 Observer observer = _observerExtPt.getExtension(observerId); 126 _addSupportedObserver(supportedObservers, observer, event); 127 } 128 129 // Retrieve supported observers from manual registration 130 for (Observer observer : _registeredObservers) 131 { 132 _addSupportedObserver(supportedObservers, observer, event); 133 } 134 135 // Order observers (0 is first, Integer.MAX_INT is last) 136 Collections.sort(supportedObservers, Comparator.comparingInt( 137 observer -> observer.getPriority(event) 138 )); 139 140 // Observes the event and prepares the asynchronous observes. 141 _putAdditionalArgs(event); 142 _observesEvent(event, supportedObservers, result); 143 } 144 catch (Exception e) 145 { 146 // Observers should never fail, so just log the error 147 getLogger().error("Unable to dispatch event: " + event + " to observers", e); 148 } 149 150 // Keep in current request the futures of the curent request 151 try 152 { 153 List<Future> futuresForCurrentRequest = _getFuturesForRequest(); 154 futuresForCurrentRequest.addAll(result); 155 } 156 catch (CascadingRuntimeException e) 157 { 158 // can happen if no request 159 // do nothing though 160 } 161 162 return result; 163 } 164 165 private void _addSupportedObserver(List<Observer> supportedObservers, Observer observer, final Event event) 166 { 167 if (getLogger().isDebugEnabled()) 168 { 169 getLogger().debug("Checking support for event: " + event + " and observer: " + observer); 170 } 171 172 if (observer.supports(event)) 173 { 174 if (getLogger().isDebugEnabled()) 175 { 176 getLogger().debug("Event: " + event + " supported for observer: " + observer); 177 } 178 179 supportedObservers.add(observer); 180 } 181 } 182 183 /** 184 * Gets all the {@link Future}s of all the {@link AsyncObserver}s notified by the call to {@link ObservationManager#notify(Event)} during the current request. 185 * <br>This can be useful to call {@link Future#get()} in order to ensure all {@link AsyncObserver}s notified by the current request have finished to work. 186 * @return the {@link Future}s of the notified observers during the current request. 187 */ 188 public List<Future> getFuturesForRequest() 189 { 190 try 191 { 192 return Collections.unmodifiableList(_getFuturesForRequest()); 193 } 194 catch (CascadingRuntimeException e) 195 { 196 // can happen if no request 197 // return empty list 198 return Collections.emptyList(); 199 } 200 } 201 202 private List<Future> _getFuturesForRequest() throws CascadingRuntimeException 203 { 204 Request request = ContextHelper.getRequest(_avalonContext); 205 @SuppressWarnings("unchecked") 206 List<Future> futuresForCurrentRequest = (List<Future>) request.getAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME); 207 if (futuresForCurrentRequest == null) 208 { 209 futuresForCurrentRequest = new ArrayList<>(); 210 request.setAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME, futuresForCurrentRequest); 211 } 212 return futuresForCurrentRequest; 213 } 214 215 /** 216 * For all events of the given event id which will be notified during the current request, add an additional argument 217 * @param eventIds The event ids 218 * @param argName The name of the additional argument 219 * @param argValue The value of the additional argument 220 */ 221 public void addArgumentForEvents(String[] eventIds, String argName, Object argValue) 222 { 223 Request request = ContextHelper.getRequest(_avalonContext); 224 225 // Map eventId -> argName -> argValue 226 @SuppressWarnings("unchecked") 227 Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME); 228 if (additionalArgsForEvents == null) 229 { 230 additionalArgsForEvents = new HashMap<>(); 231 request.setAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME, additionalArgsForEvents); 232 } 233 234 for (String eventId : eventIds) 235 { 236 Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId); 237 if (additionalArgs == null) 238 { 239 additionalArgs = new HashMap<>(); 240 additionalArgsForEvents.put(eventId, additionalArgs); 241 } 242 243 additionalArgs.put(argName, argValue); 244 } 245 } 246 247 /** 248 * For all events of the given event id which will be notified during the current request, removes the additional argument 249 * with the given name which was added with the {@link ObservationManager#addArgumentForEvents(String[], String, Object)} method. 250 * @param eventIds The event ids 251 * @param argName The name of the additional argument to remove 252 */ 253 public void removeArgumentForEvents(String[] eventIds, String argName) 254 { 255 Request request = ContextHelper.getRequest(_avalonContext); 256 257 // Map eventId -> argName -> argValue 258 @SuppressWarnings("unchecked") 259 Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME); 260 if (additionalArgsForEvents == null) 261 { 262 return; 263 } 264 265 for (String eventId : eventIds) 266 { 267 Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId); 268 if (additionalArgs == null) 269 { 270 break; 271 } 272 273 additionalArgs.remove(argName); 274 } 275 } 276 277 private void _putAdditionalArgs(final Event event) 278 { 279 String eventId = event.getId(); 280 281 Request request = null; 282 try 283 { 284 request = ContextHelper.getRequest(_avalonContext); 285 } 286 catch (CascadingRuntimeException e) 287 { 288 // can happen if no request 289 // return empty list 290 } 291 if (request != null) 292 { 293 @SuppressWarnings("unchecked") 294 Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME); 295 if (additionalArgsForEvents != null) 296 { 297 Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId); 298 if (additionalArgs != null) 299 { 300 event.getArguments().putAll(additionalArgs); 301 } 302 } 303 } 304 } 305 306 /** 307 * Observes the event 308 * @param event The event 309 * @param supportedObservers list of observers 310 * @param result The result object to fill 311 * @throws Exception on error 312 */ 313 protected void _observesEvent(final Event event, List<Observer> supportedObservers, List<Future> result) throws Exception 314 { 315 List<AsyncObserver> parallelizableAsyncObservers = new ArrayList<>(); 316 List<AsyncObserver> nonParallelizableAsyncObservers = new ArrayList<>(); 317 318 Map<String, Object> transientVars = new HashMap<>(); 319 for (Observer supportedObserver : supportedObservers) 320 { 321 if (getLogger().isInfoEnabled()) 322 { 323 getLogger().info("Notifying observer: " + supportedObserver + " for event: " + event); 324 } 325 326 // Observe current event 327 if (supportedObserver instanceof AsyncObserver) 328 { 329 AsyncObserver asyncObserver = (AsyncObserver) supportedObserver; 330 boolean parallelizable = asyncObserver.parallelizable(); 331 332 if (getLogger().isDebugEnabled()) 333 { 334 getLogger().debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.", 335 parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event)); 336 } 337 if (__ALL_EVENTS_LOGGER.isDebugEnabled()) 338 { 339 __ALL_EVENTS_LOGGER.debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.", 340 parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event)); 341 } 342 343 if (parallelizable) 344 { 345 parallelizableAsyncObservers.add(asyncObserver); 346 } 347 else 348 { 349 nonParallelizableAsyncObservers.add(asyncObserver); 350 } 351 } 352 else 353 { 354 try 355 { 356 supportedObserver.observe(event, transientVars); 357 } 358 catch (Throwable e) 359 { 360 // Observation process should continue 361 getLogger().error("The synchronous observer '" + supportedObserver.getClass().getName() + "' failed to observe " + event, e); 362 } 363 } 364 } 365 366 // Observe for async observer. 367 if (!parallelizableAsyncObservers.isEmpty() || !nonParallelizableAsyncObservers.isEmpty()) 368 { 369 _asyncObserve(parallelizableAsyncObservers, nonParallelizableAsyncObservers, event, transientVars, result); 370 } 371 } 372 373 /** 374 * Async observe through a thread pool 375 * @param parallelObservers parallelizable asynchronous observers 376 * @param nonParallelObservers non parallelizable asynchronous observers 377 * @param event The event to observe 378 * @param transientVars The observer transient vars 379 * @param result The future results 380 */ 381 private void _asyncObserve(Collection<AsyncObserver> parallelObservers, List<AsyncObserver> nonParallelObservers, Event event, Map<String, Object> transientVars, List<Future> result) 382 { 383 if (__SINGLE_THREAD_EXECUTOR == null) 384 { 385 AsyncObserveThreadFactory threadFactory = new AsyncObserveThreadFactory(); 386 387 __SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor(threadFactory); 388 389 // 1 thread per core 390 __PARALLEL_THREAD_EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory); 391 } 392 393 if (!parallelObservers.isEmpty()) 394 { 395 for (AsyncObserver observer : parallelObservers) 396 { 397 Future future = __PARALLEL_THREAD_EXECUTOR.submit(new ParallelAsyncObserve(observer, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER)); 398 result.add(future); 399 } 400 } 401 402 if (!nonParallelObservers.isEmpty()) 403 { 404 Future future = __SINGLE_THREAD_EXECUTOR.submit(new NonParallelAsyncObserve(nonParallelObservers, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER)); 405 result.add(future); 406 } 407 } 408 409 /** 410 * Runnable to be used for asynchronous calls 411 */ 412 abstract class AbstractAsyncObserve implements Callable<Object> 413 { 414 /** event to observe */ 415 protected final Event _event; 416 protected final Map<String, Object> _transientVars; 417 protected final org.apache.avalon.framework.logger.Logger _logger; 418 protected final Logger _allEventLogger; 419 420 AbstractAsyncObserve(Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 421 { 422 _event = event; 423 _transientVars = transientVars; 424 _logger = logger; 425 _allEventLogger = allEventLogger; 426 } 427 428 @Override 429 public Object call() 430 { 431 Map<String, Object> environmentInformation = null; 432 Integer computedResult = 0; 433 try 434 { 435 // Create the environment. 436 environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_manager, _context, _logger); 437 438 _observe(); 439 } 440 catch (Exception e) 441 { 442 // Observer must never fail, so just log the error 443 _logger.error("Unable to dispatch event: " + _event + " to asynchronous observers", e); 444 computedResult = -1; 445 } 446 finally 447 { 448 // FIXME close jcr-sessions? (as in JCRSessionDispatchRequestProcess) 449 450 BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation); 451 } 452 453 return computedResult; 454 } 455 456 /** 457 * Abstract observe method where the observation should be done 458 * @throws Exception on error 459 */ 460 protected abstract void _observe() throws Exception; 461 } 462 463 /** 464 * Runnable for parallel observers 465 */ 466 class ParallelAsyncObserve extends AbstractAsyncObserve 467 { 468 private final AsyncObserver _observer; 469 470 ParallelAsyncObserve(AsyncObserver observer, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 471 { 472 super(event, transientVars, logger, allEventLogger); 473 _observer = observer; 474 } 475 476 @Override 477 protected void _observe() throws Exception 478 { 479 if (_logger.isDebugEnabled()) 480 { 481 _logger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + "."); 482 } 483 if (_allEventLogger.isDebugEnabled()) 484 { 485 _allEventLogger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + "."); 486 } 487 488 _observer.observe(_event, _transientVars); 489 } 490 } 491 492 /** 493 * Runnable for non parallel observers 494 */ 495 class NonParallelAsyncObserve extends AbstractAsyncObserve 496 { 497 private final Collection<AsyncObserver> _observers; 498 499 NonParallelAsyncObserve(Collection<AsyncObserver> observers, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 500 { 501 super(event, transientVars, logger, allEventLogger); 502 _observers = observers; 503 } 504 505 @Override 506 protected void _observe() throws Exception 507 { 508 for (AsyncObserver observer : _observers) 509 { 510 if (_logger.isDebugEnabled()) 511 { 512 _logger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + "."); 513 } 514 if (_allEventLogger.isDebugEnabled()) 515 { 516 _allEventLogger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + "."); 517 } 518 519 observer.observe(_event, _transientVars); 520 } 521 } 522 } 523 524 /** 525 * Thread factory for async observers. 526 * Set the thread name format and marks the thread as daemon. 527 */ 528 static class AsyncObserveThreadFactory implements ThreadFactory 529 { 530 private static ThreadFactory _defaultThreadFactory; 531 private static String _nameFormat; 532 private static AtomicLong _count; 533 534 public AsyncObserveThreadFactory() 535 { 536 _defaultThreadFactory = Executors.defaultThreadFactory(); 537 _nameFormat = "ametys-async-observe-%d"; 538 _count = new AtomicLong(0); 539 } 540 541 public Thread newThread(Runnable r) 542 { 543 Thread thread = _defaultThreadFactory.newThread(r); 544 thread.setName(String.format(_nameFormat, _count.getAndIncrement())); 545 thread.setDaemon(true); 546 547 return thread; 548 } 549 } 550 551 @Override 552 public void dispose() 553 { 554 if (__SINGLE_THREAD_EXECUTOR != null) 555 { 556 __SINGLE_THREAD_EXECUTOR.shutdownNow(); 557 __SINGLE_THREAD_EXECUTOR = null; 558 } 559 if (__PARALLEL_THREAD_EXECUTOR != null) 560 { 561 __PARALLEL_THREAD_EXECUTOR.shutdownNow(); 562 __PARALLEL_THREAD_EXECUTOR = null; 563 } 564 565 _context = null; 566 _manager = null; 567 } 568 569 /** 570 * Registers an {@link Observer}. 571 * @param observer the {@link Observer}. 572 */ 573 public void registerObserver(Observer observer) 574 { 575 _registeredObservers.add(observer); 576 } 577 578 /** 579 * Unregisters an {@link Observer}. 580 * @param observer the {@link Observer}. 581 */ 582 public void unregisterObserver(Observer observer) 583 { 584 _registeredObservers.remove(observer); 585 } 586}