001/* 002 * Copyright 2013 Anyware Services 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.ametys.core.observation; 017 018import java.util.ArrayList; 019import java.util.Collection; 020import java.util.Collections; 021import java.util.Comparator; 022import java.util.HashMap; 023import java.util.List; 024import java.util.Map; 025import java.util.Optional; 026import java.util.concurrent.Callable; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.Executors; 029import java.util.concurrent.Future; 030import java.util.concurrent.ThreadFactory; 031import java.util.concurrent.atomic.AtomicLong; 032 033import org.apache.avalon.framework.CascadingRuntimeException; 034import org.apache.avalon.framework.activity.Disposable; 035import org.apache.avalon.framework.component.Component; 036import org.apache.avalon.framework.context.ContextException; 037import org.apache.avalon.framework.context.Contextualizable; 038import org.apache.avalon.framework.logger.AbstractLogEnabled; 039import org.apache.avalon.framework.service.ServiceException; 040import org.apache.avalon.framework.service.ServiceManager; 041import org.apache.avalon.framework.service.Serviceable; 042import org.apache.cocoon.Constants; 043import org.apache.cocoon.components.ContextHelper; 044import org.apache.cocoon.environment.Context; 045import org.apache.cocoon.environment.ObjectModelHelper; 046import org.apache.cocoon.environment.Request; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050import org.ametys.core.authentication.AuthenticateAction; 051import org.ametys.core.engine.BackgroundEngineHelper; 052import org.ametys.core.engine.BackgroundEnvironment; 053import org.ametys.core.user.UserIdentity; 054import org.ametys.core.user.population.UserPopulationDAO; 055import org.ametys.plugins.core.user.UserDAO; 056 057/** 058 * Manager for dispatching {@link Event} instances to {@link Observer}s. 059 */ 060public class ObservationManager extends AbstractLogEnabled implements Component, Serviceable, Contextualizable, Disposable 061{ 062 /** Avalon ROLE. */ 063 public static final String ROLE = ObservationManager.class.getName(); 064 065 private static final String __CURRENT_FUTURES_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$currentFutures"; 066 private static final String __ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$.additional.args"; 067 private static final Logger __ALL_EVENTS_LOGGER = LoggerFactory.getLogger("org.ametys.cms.observation.AllEvents"); 068 069 /** 070 * The executor service managing the single thread pool. This threads is 071 * used to run non-parallelizable observers. 072 */ 073 private static ExecutorService __SINGLE_THREAD_EXECUTOR; 074 075 /** 076 * The executor service managing the thread pool of asynchronous observers 077 * allowed to run in parallel. 078 */ 079 private static ExecutorService __PARALLEL_THREAD_EXECUTOR; 080 081 /** Cocoon context */ 082 protected Context _context; 083 084 /** Service manager */ 085 protected ServiceManager _manager; 086 087 private org.apache.avalon.framework.context.Context _avalonContext; 088 private ObserverExtensionPoint _observerExtPt; 089 private Collection<Observer> _registeredObservers = new ArrayList<>(); 090 091 @Override 092 public void service(ServiceManager manager) throws ServiceException 093 { 094 _manager = manager; 095 _observerExtPt = (ObserverExtensionPoint) manager.lookup(ObserverExtensionPoint.ROLE); 096 } 097 098 @Override 099 public void contextualize(org.apache.avalon.framework.context.Context context) throws ContextException 100 { 101 _avalonContext = context; 102 _context = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT); 103 } 104 105 /** 106 * Notify of a event which will be dispatch to registered 107 * observers. 108 * @param event the event to notify. 109 * @return The {@link Future} objects of the asynchronous observers 110 */ 111 public List<Future> notify(final Event event) 112 { 113 List<Future> result = new ArrayList<>(); 114 115 try 116 { 117 if (getLogger().isDebugEnabled()) 118 { 119 getLogger().debug("Receiving " + event); 120 } 121 122 if (__ALL_EVENTS_LOGGER.isInfoEnabled()) 123 { 124 __ALL_EVENTS_LOGGER.info("Receiving " + event); 125 } 126 127 List<Observer> supportedObservers = new ArrayList<>(); 128 129 // Retrieve supported observers from extension point 130 for (String observerId : _observerExtPt.getExtensionsIds()) 131 { 132 Observer observer = _observerExtPt.getExtension(observerId); 133 _addSupportedObserver(supportedObservers, observer, event); 134 } 135 136 // Retrieve supported observers from manual registration 137 for (Observer observer : _registeredObservers) 138 { 139 _addSupportedObserver(supportedObservers, observer, event); 140 } 141 142 // Order observers (0 is first, Integer.MAX_INT is last) 143 Collections.sort(supportedObservers, Comparator.comparingInt( 144 observer -> observer.getPriority(event) 145 )); 146 147 // Observes the event and prepares the asynchronous observes. 148 _putAdditionalArgs(event); 149 _observesEvent(event, supportedObservers, result); 150 } 151 catch (Exception e) 152 { 153 // Observers should never fail, so just log the error 154 getLogger().error("Unable to dispatch event: " + event + " to observers", e); 155 } 156 157 // Keep in current request the futures of the curent request 158 try 159 { 160 List<Future> futuresForCurrentRequest = _getFuturesForRequest(); 161 futuresForCurrentRequest.addAll(result); 162 } 163 catch (CascadingRuntimeException e) 164 { 165 // can happen if no request 166 // do nothing though 167 } 168 169 return result; 170 } 171 172 private void _addSupportedObserver(List<Observer> supportedObservers, Observer observer, final Event event) 173 { 174 if (getLogger().isDebugEnabled()) 175 { 176 getLogger().debug("Checking support for event: " + event + " and observer: " + observer); 177 } 178 179 if (observer.supports(event)) 180 { 181 if (getLogger().isDebugEnabled()) 182 { 183 getLogger().debug("Event: " + event + " supported for observer: " + observer); 184 } 185 186 supportedObservers.add(observer); 187 } 188 } 189 190 /** 191 * Gets all the {@link Future}s of all the {@link AsyncObserver}s notified by the call to {@link ObservationManager#notify(Event)} during the current request. 192 * <br>This can be useful to call {@link Future#get()} in order to ensure all {@link AsyncObserver}s notified by the current request have finished to work. 193 * @return the {@link Future}s of the notified observers during the current request. 194 */ 195 public List<Future> getFuturesForRequest() 196 { 197 try 198 { 199 return Collections.unmodifiableList(_getFuturesForRequest()); 200 } 201 catch (CascadingRuntimeException e) 202 { 203 // can happen if no request 204 // return empty list 205 return Collections.emptyList(); 206 } 207 } 208 209 private List<Future> _getFuturesForRequest() throws CascadingRuntimeException 210 { 211 Request request = ContextHelper.getRequest(_avalonContext); 212 @SuppressWarnings("unchecked") 213 List<Future> futuresForCurrentRequest = (List<Future>) request.getAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME); 214 if (futuresForCurrentRequest == null) 215 { 216 futuresForCurrentRequest = new ArrayList<>(); 217 request.setAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME, futuresForCurrentRequest); 218 } 219 return futuresForCurrentRequest; 220 } 221 222 /** 223 * For all events of the given event id which will be notified during the current request, add an additional argument 224 * @param eventIds The event ids 225 * @param argName The name of the additional argument 226 * @param argValue The value of the additional argument 227 */ 228 public void addArgumentForEvents(String[] eventIds, String argName, Object argValue) 229 { 230 Request request = ContextHelper.getRequest(_avalonContext); 231 232 // Map eventId -> argName -> argValue 233 @SuppressWarnings("unchecked") 234 Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME); 235 if (additionalArgsForEvents == null) 236 { 237 additionalArgsForEvents = new HashMap<>(); 238 request.setAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME, additionalArgsForEvents); 239 } 240 241 for (String eventId : eventIds) 242 { 243 Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId); 244 if (additionalArgs == null) 245 { 246 additionalArgs = new HashMap<>(); 247 additionalArgsForEvents.put(eventId, additionalArgs); 248 } 249 250 additionalArgs.put(argName, argValue); 251 } 252 } 253 254 /** 255 * For all events of the given event id which will be notified during the current request, removes the additional argument 256 * with the given name which was added with the {@link ObservationManager#addArgumentForEvents(String[], String, Object)} method. 257 * @param eventIds The event ids 258 * @param argName The name of the additional argument to remove 259 */ 260 public void removeArgumentForEvents(String[] eventIds, String argName) 261 { 262 Request request = ContextHelper.getRequest(_avalonContext); 263 264 // Map eventId -> argName -> argValue 265 @SuppressWarnings("unchecked") 266 Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME); 267 if (additionalArgsForEvents == null) 268 { 269 return; 270 } 271 272 for (String eventId : eventIds) 273 { 274 Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId); 275 if (additionalArgs == null) 276 { 277 break; 278 } 279 280 additionalArgs.remove(argName); 281 } 282 } 283 284 private void _putAdditionalArgs(final Event event) 285 { 286 String eventId = event.getId(); 287 288 Request request = null; 289 try 290 { 291 request = ContextHelper.getRequest(_avalonContext); 292 } 293 catch (CascadingRuntimeException e) 294 { 295 // can happen if no request 296 // return empty list 297 } 298 if (request != null) 299 { 300 @SuppressWarnings("unchecked") 301 Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME); 302 if (additionalArgsForEvents != null) 303 { 304 Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId); 305 if (additionalArgs != null) 306 { 307 event.getArguments().putAll(additionalArgs); 308 } 309 } 310 } 311 } 312 313 /** 314 * Observes the event 315 * @param event The event 316 * @param supportedObservers list of observers 317 * @param result The result object to fill 318 * @throws Exception on error 319 */ 320 protected void _observesEvent(final Event event, List<Observer> supportedObservers, List<Future> result) throws Exception 321 { 322 List<AsyncObserver> parallelizableAsyncObservers = new ArrayList<>(); 323 List<AsyncObserver> nonParallelizableAsyncObservers = new ArrayList<>(); 324 325 Map<String, Object> transientVars = new HashMap<>(); 326 for (Observer supportedObserver : supportedObservers) 327 { 328 if (getLogger().isInfoEnabled()) 329 { 330 getLogger().info("Notifying observer: " + supportedObserver + " for event: " + event); 331 } 332 333 // Observe current event 334 if (supportedObserver instanceof AsyncObserver) 335 { 336 AsyncObserver asyncObserver = (AsyncObserver) supportedObserver; 337 boolean parallelizable = asyncObserver.parallelizable(); 338 339 if (getLogger().isDebugEnabled()) 340 { 341 getLogger().debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.", 342 parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event)); 343 } 344 if (__ALL_EVENTS_LOGGER.isDebugEnabled()) 345 { 346 __ALL_EVENTS_LOGGER.debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.", 347 parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event)); 348 } 349 350 if (parallelizable) 351 { 352 parallelizableAsyncObservers.add(asyncObserver); 353 } 354 else 355 { 356 nonParallelizableAsyncObservers.add(asyncObserver); 357 } 358 } 359 else 360 { 361 try 362 { 363 supportedObserver.observe(event, transientVars); 364 } 365 catch (Throwable e) 366 { 367 // Observation process should continue 368 getLogger().error("The synchronous observer '" + supportedObserver.getClass().getName() + "' failed to observe " + event, e); 369 } 370 } 371 } 372 373 // Observe for async observer. 374 if (!parallelizableAsyncObservers.isEmpty() || !nonParallelizableAsyncObservers.isEmpty()) 375 { 376 _asyncObserve(parallelizableAsyncObservers, nonParallelizableAsyncObservers, event, transientVars, result); 377 } 378 } 379 380 /** 381 * Async observe through a thread pool 382 * @param parallelObservers parallelizable asynchronous observers 383 * @param nonParallelObservers non parallelizable asynchronous observers 384 * @param event The event to observe 385 * @param transientVars The observer transient vars 386 * @param result The future results 387 */ 388 private void _asyncObserve(Collection<AsyncObserver> parallelObservers, List<AsyncObserver> nonParallelObservers, Event event, Map<String, Object> transientVars, List<Future> result) 389 { 390 if (__SINGLE_THREAD_EXECUTOR == null) 391 { 392 AsyncObserveThreadFactory threadFactory = new AsyncObserveThreadFactory(); 393 394 __SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor(threadFactory); 395 396 // 1 thread per core 397 __PARALLEL_THREAD_EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory); 398 } 399 400 if (!parallelObservers.isEmpty()) 401 { 402 for (AsyncObserver observer : parallelObservers) 403 { 404 Future future = __PARALLEL_THREAD_EXECUTOR.submit(new ParallelAsyncObserve(observer, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER)); 405 result.add(future); 406 } 407 } 408 409 if (!nonParallelObservers.isEmpty()) 410 { 411 Future future = __SINGLE_THREAD_EXECUTOR.submit(new NonParallelAsyncObserve(nonParallelObservers, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER)); 412 result.add(future); 413 } 414 } 415 416 /** 417 * Runnable to be used for asynchronous calls 418 */ 419 abstract class AbstractAsyncObserve implements Callable<Object> 420 { 421 /** event to observe */ 422 protected final Event _event; 423 protected final Map<String, Object> _transientVars; 424 protected final org.apache.avalon.framework.logger.Logger _logger; 425 protected final Logger _allEventLogger; 426 427 AbstractAsyncObserve(Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 428 { 429 _event = event; 430 _transientVars = transientVars; 431 _logger = logger; 432 _allEventLogger = allEventLogger; 433 } 434 435 @Override 436 public Object call() 437 { 438 Map<String, Object> environmentInformation = null; 439 Integer computedResult = 0; 440 try 441 { 442 // Create the environment. 443 environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_manager, _context, _logger); 444 445 _setUserInSession(environmentInformation); 446 447 _observe(); 448 } 449 catch (Exception e) 450 { 451 // Observer must never fail, so just log the error 452 _logger.error("Unable to dispatch event: " + _event + " to asynchronous observers", e); 453 computedResult = -1; 454 } 455 finally 456 { 457 // FIXME close jcr-sessions? (as in JCRSessionDispatchRequestProcess) 458 459 BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation); 460 } 461 462 return computedResult; 463 } 464 465 private boolean _setUserInSession(Map<String, Object> environmentInformation) 466 { 467 UserIdentity userIdentity = Optional.ofNullable(_event.getIssuer()) 468 .orElse(UserPopulationDAO.SYSTEM_USER_IDENTITY); 469 470 BackgroundEnvironment bgEnv = (BackgroundEnvironment) environmentInformation.get("environment"); 471 Request request = ObjectModelHelper.getRequest(bgEnv.getObjectModel()); 472 473 AuthenticateAction.setUserIdentityInSession(request, userIdentity, new UserDAO.ImpersonateCredentialProvider(), true); 474 475 return true; 476 } 477 478 /** 479 * Abstract observe method where the observation should be done 480 * @throws Exception on error 481 */ 482 protected abstract void _observe() throws Exception; 483 } 484 485 /** 486 * Runnable for parallel observers 487 */ 488 class ParallelAsyncObserve extends AbstractAsyncObserve 489 { 490 private final AsyncObserver _observer; 491 492 ParallelAsyncObserve(AsyncObserver observer, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 493 { 494 super(event, transientVars, logger, allEventLogger); 495 _observer = observer; 496 } 497 498 @Override 499 protected void _observe() throws Exception 500 { 501 if (_logger.isDebugEnabled()) 502 { 503 _logger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + "."); 504 } 505 if (_allEventLogger.isDebugEnabled()) 506 { 507 _allEventLogger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + "."); 508 } 509 510 _observer.observe(_event, _transientVars); 511 } 512 } 513 514 /** 515 * Runnable for non parallel observers 516 */ 517 class NonParallelAsyncObserve extends AbstractAsyncObserve 518 { 519 private final Collection<AsyncObserver> _observers; 520 521 NonParallelAsyncObserve(Collection<AsyncObserver> observers, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 522 { 523 super(event, transientVars, logger, allEventLogger); 524 _observers = observers; 525 } 526 527 @Override 528 protected void _observe() throws Exception 529 { 530 for (AsyncObserver observer : _observers) 531 { 532 if (_logger.isDebugEnabled()) 533 { 534 _logger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + "."); 535 } 536 if (_allEventLogger.isDebugEnabled()) 537 { 538 _allEventLogger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + "."); 539 } 540 541 observer.observe(_event, _transientVars); 542 } 543 } 544 } 545 546 /** 547 * Thread factory for async observers. 548 * Set the thread name format and marks the thread as daemon. 549 */ 550 static class AsyncObserveThreadFactory implements ThreadFactory 551 { 552 private static ThreadFactory _defaultThreadFactory; 553 private static String _nameFormat; 554 private static AtomicLong _count; 555 556 public AsyncObserveThreadFactory() 557 { 558 _defaultThreadFactory = Executors.defaultThreadFactory(); 559 _nameFormat = "ametys-async-observe-%d"; 560 _count = new AtomicLong(0); 561 } 562 563 public Thread newThread(Runnable r) 564 { 565 Thread thread = _defaultThreadFactory.newThread(r); 566 thread.setName(String.format(_nameFormat, _count.getAndIncrement())); 567 thread.setDaemon(true); 568 569 return thread; 570 } 571 } 572 573 @Override 574 public void dispose() 575 { 576 if (__SINGLE_THREAD_EXECUTOR != null) 577 { 578 __SINGLE_THREAD_EXECUTOR.shutdownNow(); 579 __SINGLE_THREAD_EXECUTOR = null; 580 } 581 if (__PARALLEL_THREAD_EXECUTOR != null) 582 { 583 __PARALLEL_THREAD_EXECUTOR.shutdownNow(); 584 __PARALLEL_THREAD_EXECUTOR = null; 585 } 586 587 _context = null; 588 _manager = null; 589 } 590 591 /** 592 * Registers an {@link Observer}. 593 * @param observer the {@link Observer}. 594 */ 595 public void registerObserver(Observer observer) 596 { 597 _registeredObservers.add(observer); 598 } 599 600 /** 601 * Unregisters an {@link Observer}. 602 * @param observer the {@link Observer}. 603 */ 604 public void unregisterObserver(Observer observer) 605 { 606 _registeredObservers.remove(observer); 607 } 608}