001/* 002 * Copyright 2018 Anyware Services 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.ametys.plugins.extraction.execution.pipeline; 017 018import java.io.File; 019import java.io.IOException; 020import java.nio.file.Files; 021import java.nio.file.Path; 022import java.time.Instant; 023import java.time.temporal.ChronoUnit; 024import java.util.ArrayList; 025import java.util.HashMap; 026import java.util.List; 027import java.util.Map; 028import java.util.function.Function; 029import java.util.stream.Stream; 030 031import org.apache.avalon.framework.activity.Disposable; 032import org.apache.avalon.framework.activity.Initializable; 033import org.apache.avalon.framework.component.Component; 034import org.apache.avalon.framework.configuration.Configuration; 035import org.apache.avalon.framework.configuration.ConfigurationException; 036import org.apache.avalon.framework.configuration.DefaultConfigurationBuilder; 037import org.apache.avalon.framework.container.ContainerUtil; 038import org.apache.avalon.framework.service.ServiceException; 039import org.apache.avalon.framework.service.ServiceManager; 040import org.apache.avalon.framework.service.Serviceable; 041import org.apache.commons.lang3.StringUtils; 042import org.apache.commons.lang3.tuple.Pair; 043import org.apache.excalibur.source.SourceResolver; 044import org.apache.excalibur.source.impl.FileSource; 045import org.xml.sax.SAXException; 046 047import org.ametys.core.ui.Callable; 048import org.ametys.core.util.LambdaUtils; 049import org.ametys.core.util.URIUtils; 050import org.ametys.plugins.extraction.ExtractionConstants; 051import org.ametys.plugins.extraction.execution.pipeline.impl.ConfigurablePipelineDescriptor; 052import org.ametys.plugins.extraction.execution.pipeline.impl.NoOpPipelineDescriptor; 053import org.ametys.runtime.plugin.component.AbstractLogEnabled; 054 055/** 056 * Manager of {@link PipelineDescriptor}s 057 */ 058public class PipelineManager extends AbstractLogEnabled implements Component, Serviceable, Initializable, Disposable 059{ 060 /** The Avalon role. */ 061 public static final String ROLE = PipelineManager.class.getName(); 062 063 // '*' char is appended as it is a forbidden character for file names, it ensures the uniqueness of the id 064 private static final String __NO_OP_PIPELINE_ID = NoOpPipelineDescriptor.class.getName() + "*"; 065 066 private PipelineDescriptor _noOpPipelineDesc; 067 private Map<String, PipelineDescriptor> _pipelineDescCache = new HashMap<>(); 068 private Map<String, Long> _lastReading = new HashMap<>(); 069 070 private SourceResolver _resolver; 071 private PipelineSerializerModelExtensionPoint _serializerEP; 072 073 @Override 074 public void service(ServiceManager manager) throws ServiceException 075 { 076 _resolver = (SourceResolver) manager.lookup(SourceResolver.ROLE); 077 _serializerEP = (PipelineSerializerModelExtensionPoint) manager.lookup(PipelineSerializerModelExtensionPoint.ROLE); 078 } 079 080 @Override 081 public void initialize() throws Exception 082 { 083 PipelineSerializerModel xmlSerializer = _serializerEP.getExtension("xml"); 084 _noOpPipelineDesc = new NoOpPipelineDescriptor(xmlSerializer); 085 } 086 087 @Override 088 public void dispose() 089 { 090 _pipelineDescCache.clear(); 091 _lastReading.clear(); 092 } 093 094 /** 095 * Returns <code>true</code> if the {@link PipelineDescriptor} for asked path exists 096 * @param path The path of the {@link PipelineDescriptor} 097 * @return <code>true</code> if the asked pipeline exists 098 * @throws IOException if an I/O exception occurred when accessing the root folder of pipeline files 099 */ 100 public boolean has(String path) throws IOException 101 { 102 return get(path) != null; 103 } 104 105 /** 106 * Gets the {@link PipelineDescriptor} for the given path 107 * @param path The path of the pipeline 108 * @return the request {@link PipelineDescriptor}, or null if not found 109 * @throws IOException if an I/O exception occurred when accessing the root folder of pipeline files 110 */ 111 public PipelineDescriptor get(Path path) throws IOException 112 { 113 return get(path.toString()); 114 } 115 116 /** 117 * Gets the {@link PipelineDescriptor} for the given path 118 * @param path The path of the pipeline 119 * @return the request {@link PipelineDescriptor}, or null if not found 120 * @throws IOException if an I/O exception occurred when accessing the root folder of pipeline files 121 */ 122 public PipelineDescriptor get(String path) throws IOException 123 { 124 if (__NO_OP_PIPELINE_ID.equals(path)) 125 { 126 return _noOpPipelineDesc; 127 } 128 return _readAndGetPipeline(path); 129 } 130 131 /** 132 * Gets the available pipelines for the given extraction 133 * @param extractionId the extraction's definition file name 134 * @return the available pipelines 135 * @throws IOException if an I/O exception occurred when accessing the root folder of pipeline files 136 */ 137 @Callable (right = "Extraction_Rights_ExecuteExtraction") 138 public Map<String, Object> getAvailablePipelines(String extractionId) throws IOException 139 { 140 if (extractionId != null) 141 { 142 String decodedExtractionId = URIUtils.decode(extractionId); 143 List<Map<String, Object>> enumeration = _getJsonEnumeration(URIUtils.decode(decodedExtractionId)); 144 return Map.of("pipelines", enumeration); 145 } 146 147 return Map.of(); 148 } 149 150 private List<Map<String, Object>> _getJsonEnumeration(String extractionId) throws IOException 151 { 152 List<Map<String, Object>> enumeration = new ArrayList<>(); 153 _readAll(); 154 155 for (String pipelineId : _pipelineDescCache.keySet()) 156 { 157 PipelineDescriptor pipeline = _pipelineDescCache.get(pipelineId); 158 ExtractionMatcher matcher = pipeline.getExtractionMatcher(); 159 if (matcher.isHandled(extractionId)) 160 { 161 enumeration.add(Map.of( 162 "value", pipelineId, 163 "text", pipeline.getLabel())); 164 } 165 } 166 167 return enumeration; 168 } 169 170 /** 171 * Gets the id of the default {@link PipelineDescriptor} 172 * @return the id of the default {@link PipelineDescriptor} 173 */ 174 public String getDefaultPipeline() 175 { 176 return __NO_OP_PIPELINE_ID; 177 } 178 179 private synchronized PipelineDescriptor _readAndGetPipeline(String path) throws IOException 180 { 181 Path pipelinePath = _absolutePath(path); 182 if (Files.exists(pipelinePath)) 183 { 184 Pair<String, PipelineDescriptor> readPipeline = _readPipeline(pipelinePath); 185 if (readPipeline != null) 186 { 187 PipelineDescriptor pipeline = readPipeline.getRight(); 188 String id = readPipeline.getLeft(); 189 _pipelineDescCache.put(id, pipeline); 190 return pipeline; 191 } 192 } 193 return null; 194 } 195 196 private synchronized void _readAll() throws IOException 197 { 198 _readFromFolder(_absolutePath(StringUtils.EMPTY)) 199 .forEach(pair -> _pipelineDescCache.put(pair.getKey(), pair.getValue())); 200 201 // Add the default pipeline 202 _pipelineDescCache.put(__NO_OP_PIPELINE_ID, _noOpPipelineDesc); 203 } 204 205 private Path _absolutePath(String relativePath) throws IOException 206 { 207 FileSource pipelinesDirScr = null; 208 FileSource src = null; 209 try 210 { 211 pipelinesDirScr = (FileSource) _resolver.resolveURI(ExtractionConstants.PIPELINES_DIR); 212 pipelinesDirScr.getFile().mkdirs(); 213 214 src = (FileSource) _resolver.resolveURI(ExtractionConstants.PIPELINES_DIR + relativePath); 215 File file = src.getFile(); 216 return file.toPath(); 217 } 218 finally 219 { 220 _resolver.release(pipelinesDirScr); 221 _resolver.release(src); 222 } 223 } 224 225 private Stream<Pair<String, PipelineDescriptor>> _readFromFolder(Path folderPath) throws IOException 226 { 227 return Files.list(folderPath) 228 .map(LambdaUtils.wrap(this::_readFromFile)) 229 .flatMap(Function.identity()); 230 } 231 232 private Stream<Pair<String, PipelineDescriptor>> _readFromFile(Path path) throws IOException 233 { 234 return Files.isDirectory(path) ? _readFromFolder(path) : Stream.ofNullable(_readPipeline(path)); 235 } 236 237 private Pair<String, PipelineDescriptor> _readPipeline(Path pipelinePath) throws IOException 238 { 239 String id = _getPipelineIdFromAbsolutePath(pipelinePath); 240 File pipelineFile = pipelinePath.toFile(); 241 long lastModified = (pipelineFile.lastModified() / 1000) * 1000; // second precision for Linux file systems 242 if (lastModified > _lastReading.getOrDefault(id, -1L)) 243 { 244 // cache is out of date => need to read 245 try 246 { 247 Configuration conf = new DefaultConfigurationBuilder().buildFromFile(pipelineFile); 248 PipelineDescriptor pipeline = new ConfigurablePipelineDescriptor(id, _resolver, _serializerEP); 249 ContainerUtil.configure(pipeline, conf); 250 return Pair.of(id, pipeline); 251 } 252 catch (ConfigurationException | SAXException | IOException e) 253 { 254 getLogger().error("File '{}' could not be parsed as an extraction pipeline.", pipelinePath, e); 255 return null; 256 } 257 finally 258 { 259 _lastReading.put(id, _now()); 260 } 261 } 262 else 263 { 264 // cache is up to date 265 PipelineDescriptor pipeline = _pipelineDescCache.get(id); 266 return pipeline == null ? null : Pair.of(id, pipeline); 267 } 268 } 269 270 private String _getPipelineIdFromAbsolutePath(Path pipelineAbsolutePath) throws IOException 271 { 272 FileSource pipelinesDirScr = null; 273 try 274 { 275 pipelinesDirScr = (FileSource) _resolver.resolveURI(ExtractionConstants.PIPELINES_DIR); 276 File pipelinesFile = pipelinesDirScr.getFile(); 277 Path pipelineRelativePath = pipelinesFile.toPath().relativize(pipelineAbsolutePath); 278 return pipelineRelativePath.toString(); 279 } 280 finally 281 { 282 _resolver.release(pipelinesDirScr); 283 } 284 } 285 286 private static Long _now() 287 { 288 return Instant.now().truncatedTo(ChronoUnit.SECONDS).toEpochMilli(); 289 } 290}