001/* 002 * Copyright 2018 Anyware Services 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.ametys.plugins.extraction.execution.pipeline; 017 018import java.io.File; 019import java.io.IOException; 020import java.nio.file.Files; 021import java.nio.file.Path; 022import java.time.Instant; 023import java.time.temporal.ChronoUnit; 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.Map.Entry; 029import java.util.function.Function; 030import java.util.stream.Stream; 031 032import org.apache.avalon.framework.activity.Disposable; 033import org.apache.avalon.framework.activity.Initializable; 034import org.apache.avalon.framework.component.Component; 035import org.apache.avalon.framework.configuration.Configuration; 036import org.apache.avalon.framework.configuration.ConfigurationException; 037import org.apache.avalon.framework.configuration.DefaultConfigurationBuilder; 038import org.apache.avalon.framework.container.ContainerUtil; 039import org.apache.avalon.framework.service.ServiceException; 040import org.apache.avalon.framework.service.ServiceManager; 041import org.apache.avalon.framework.service.Serviceable; 042import org.apache.commons.lang3.StringUtils; 043import org.apache.commons.lang3.tuple.Pair; 044import org.apache.excalibur.source.SourceResolver; 045import org.apache.excalibur.source.impl.FileSource; 046import org.xml.sax.SAXException; 047 048import org.ametys.core.cache.AbstractCacheManager; 049import org.ametys.core.cache.Cache; 050import org.ametys.core.ui.Callable; 051import org.ametys.core.util.LambdaUtils; 052import org.ametys.core.util.URIUtils; 053import org.ametys.plugins.extraction.ExtractionConstants; 054import org.ametys.plugins.extraction.execution.pipeline.impl.ConfigurablePipelineDescriptor; 055import org.ametys.plugins.extraction.execution.pipeline.impl.NoOpPipelineDescriptor; 056import org.ametys.runtime.i18n.I18nizableText; 057import org.ametys.runtime.plugin.component.AbstractLogEnabled; 058 059/** 060 * Manager of {@link PipelineDescriptor}s 061 */ 062public class PipelineManager extends AbstractLogEnabled implements Component, Serviceable, Initializable, Disposable 063{ 064 /** The Avalon role. */ 065 public static final String ROLE = PipelineManager.class.getName(); 066 067 // '*' char is appended as it is a forbidden character for file names, it ensures the uniqueness of the id 068 private static final String __NO_OP_PIPELINE_ID = NoOpPipelineDescriptor.class.getName() + "*"; 069 070 private static final String __PIPELINE_DESCRIPTOR_CACHE = "plugin-extraction.PipelineDescriptor"; 071 072 private PipelineDescriptor _noOpPipelineDesc; 073 private Map<String, Long> _lastReading = new HashMap<>(); 074 075 private AbstractCacheManager _cacheManager; 076 private SourceResolver _resolver; 077 private PipelineSerializerModelExtensionPoint _serializerEP; 078 079 080 @Override 081 public void service(ServiceManager manager) throws ServiceException 082 { 083 _cacheManager = (AbstractCacheManager) manager.lookup(AbstractCacheManager.ROLE); 084 _resolver = (SourceResolver) manager.lookup(SourceResolver.ROLE); 085 _serializerEP = (PipelineSerializerModelExtensionPoint) manager.lookup(PipelineSerializerModelExtensionPoint.ROLE); 086 } 087 088 @Override 089 public void initialize() throws Exception 090 { 091 PipelineSerializerModel xmlSerializer = _serializerEP.getExtension("xml"); 092 _noOpPipelineDesc = new NoOpPipelineDescriptor(xmlSerializer); 093 094 _cacheManager.createMemoryCache(__PIPELINE_DESCRIPTOR_CACHE, 095 new I18nizableText("plugin.extraction", "PLUGINS_EXTRACTION_CACHE_PIPELINE_DESCRIPTOR_LABEL"), 096 new I18nizableText("plugin.extraction", "PLUGINS_EXTRACTION_CACHE_PIPELINE_DESCRIPTOR_DESCRIPTION"), 097 true, 098 null); 099 } 100 101 @Override 102 public void dispose() 103 { 104 _getPipelineDescriptorCache().invalidateAll(); 105 _lastReading.clear(); 106 } 107 108 /** 109 * Returns <code>true</code> if the {@link PipelineDescriptor} for asked path exists 110 * @param path The path of the {@link PipelineDescriptor} 111 * @return <code>true</code> if the asked pipeline exists 112 * @throws IOException if an I/O exception occurred when accessing the root folder of pipeline files 113 */ 114 public boolean has(String path) throws IOException 115 { 116 return get(path) != null; 117 } 118 119 /** 120 * Gets the {@link PipelineDescriptor} for the given path 121 * @param path The path of the pipeline 122 * @return the request {@link PipelineDescriptor}, or null if not found 123 * @throws IOException if an I/O exception occurred when accessing the root folder of pipeline files 124 */ 125 public PipelineDescriptor get(Path path) throws IOException 126 { 127 return get(path.toString()); 128 } 129 130 /** 131 * Gets the {@link PipelineDescriptor} for the given path 132 * @param path The path of the pipeline 133 * @return the request {@link PipelineDescriptor}, or null if not found 134 * @throws IOException if an I/O exception occurred when accessing the root folder of pipeline files 135 */ 136 public PipelineDescriptor get(String path) throws IOException 137 { 138 if (__NO_OP_PIPELINE_ID.equals(path)) 139 { 140 return _noOpPipelineDesc; 141 } 142 return _readAndGetPipeline(path); 143 } 144 145 /** 146 * Gets the available pipelines for the given extraction 147 * @param extractionId the extraction's definition file name 148 * @return the available pipelines 149 * @throws IOException if an I/O exception occurred when accessing the root folder of pipeline files 150 */ 151 @Callable (rights = "Extraction_Rights_ExecuteExtraction") 152 public Map<String, Object> getAvailablePipelines(String extractionId) throws IOException 153 { 154 if (extractionId != null) 155 { 156 String decodedExtractionId = URIUtils.decode(extractionId); 157 List<Map<String, Object>> enumeration = _getJsonEnumeration(URIUtils.decode(decodedExtractionId)); 158 return Map.of("pipelines", enumeration); 159 } 160 161 return Map.of(); 162 } 163 164 private List<Map<String, Object>> _getJsonEnumeration(String extractionId) throws IOException 165 { 166 List<Map<String, Object>> enumeration = new ArrayList<>(); 167 _readAll(); 168 169 for (Entry<String , PipelineDescriptor> entry : _getPipelineDescriptorCache().asMap().entrySet()) 170 { 171 PipelineDescriptor pipeline = entry.getValue(); 172 ExtractionMatcher matcher = pipeline.getExtractionMatcher(); 173 if (matcher.isHandled(extractionId)) 174 { 175 enumeration.add(Map.of( 176 "value", entry.getKey(), 177 "text", pipeline.getLabel())); 178 } 179 } 180 181 return enumeration; 182 } 183 184 /** 185 * Gets the id of the default {@link PipelineDescriptor} 186 * @return the id of the default {@link PipelineDescriptor} 187 */ 188 public String getDefaultPipeline() 189 { 190 return __NO_OP_PIPELINE_ID; 191 } 192 193 private synchronized PipelineDescriptor _readAndGetPipeline(String path) throws IOException 194 { 195 Path pipelinePath = _absolutePath(path); 196 if (Files.exists(pipelinePath)) 197 { 198 Pair<String, PipelineDescriptor> readPipeline = _readPipeline(pipelinePath); 199 if (readPipeline != null) 200 { 201 PipelineDescriptor pipeline = readPipeline.getRight(); 202 String id = readPipeline.getLeft(); 203 _getPipelineDescriptorCache().put(id, pipeline); 204 return pipeline; 205 } 206 } 207 return null; 208 } 209 210 private synchronized void _readAll() throws IOException 211 { 212 _readFromFolder(_absolutePath(StringUtils.EMPTY)) 213 .forEach(pair -> _getPipelineDescriptorCache().put(pair.getKey(), pair.getValue())); 214 215 // Add the default pipeline 216 _getPipelineDescriptorCache().put(__NO_OP_PIPELINE_ID, _noOpPipelineDesc); 217 } 218 219 private Path _absolutePath(String relativePath) throws IOException 220 { 221 FileSource pipelinesDirScr = null; 222 FileSource src = null; 223 try 224 { 225 pipelinesDirScr = (FileSource) _resolver.resolveURI(ExtractionConstants.PIPELINES_DIR); 226 pipelinesDirScr.getFile().mkdirs(); 227 228 src = (FileSource) _resolver.resolveURI(ExtractionConstants.PIPELINES_DIR + relativePath); 229 File file = src.getFile(); 230 return file.toPath(); 231 } 232 finally 233 { 234 _resolver.release(pipelinesDirScr); 235 _resolver.release(src); 236 } 237 } 238 239 private Stream<Pair<String, PipelineDescriptor>> _readFromFolder(Path folderPath) throws IOException 240 { 241 return Files.list(folderPath) 242 .map(LambdaUtils.wrap(this::_readFromFile)) 243 .flatMap(Function.identity()); 244 } 245 246 private Stream<Pair<String, PipelineDescriptor>> _readFromFile(Path path) throws IOException 247 { 248 return Files.isDirectory(path) ? _readFromFolder(path) : Stream.ofNullable(_readPipeline(path)); 249 } 250 251 private Pair<String, PipelineDescriptor> _readPipeline(Path pipelinePath) throws IOException 252 { 253 String id = _getPipelineIdFromAbsolutePath(pipelinePath); 254 File pipelineFile = pipelinePath.toFile(); 255 long lastModified = (pipelineFile.lastModified() / 1000) * 1000; // second precision for Linux file systems 256 if (lastModified > _lastReading.getOrDefault(id, -1L)) 257 { 258 // cache is out of date => need to read 259 try 260 { 261 Configuration conf = new DefaultConfigurationBuilder().buildFromFile(pipelineFile); 262 PipelineDescriptor pipeline = new ConfigurablePipelineDescriptor(id, _resolver, _serializerEP); 263 ContainerUtil.configure(pipeline, conf); 264 return Pair.of(id, pipeline); 265 } 266 catch (ConfigurationException | SAXException | IOException e) 267 { 268 getLogger().error("File '{}' could not be parsed as an extraction pipeline.", pipelinePath, e); 269 return null; 270 } 271 finally 272 { 273 _lastReading.put(id, _now()); 274 } 275 } 276 else 277 { 278 // cache is up to date 279 PipelineDescriptor pipeline = _getPipelineDescriptorCache().get(id); 280 return pipeline == null ? null : Pair.of(id, pipeline); 281 } 282 } 283 284 private String _getPipelineIdFromAbsolutePath(Path pipelineAbsolutePath) throws IOException 285 { 286 FileSource pipelinesDirScr = null; 287 try 288 { 289 pipelinesDirScr = (FileSource) _resolver.resolveURI(ExtractionConstants.PIPELINES_DIR); 290 File pipelinesFile = pipelinesDirScr.getFile(); 291 Path pipelineRelativePath = pipelinesFile.toPath().relativize(pipelineAbsolutePath); 292 return pipelineRelativePath.toString(); 293 } 294 finally 295 { 296 _resolver.release(pipelinesDirScr); 297 } 298 } 299 300 private static Long _now() 301 { 302 return Instant.now().truncatedTo(ChronoUnit.SECONDS).toEpochMilli(); 303 } 304 305 private Cache<String, PipelineDescriptor> _getPipelineDescriptorCache() 306 { 307 return _cacheManager.get(__PIPELINE_DESCRIPTOR_CACHE); 308 } 309}