001/* 002 * Copyright 2015 Anyware Services 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.ametys.plugins.syndication; 017 018import java.io.IOException; 019import java.io.InputStream; 020import java.io.Reader; 021import java.time.Duration; 022import java.time.ZonedDateTime; 023import java.util.Collection; 024import java.util.concurrent.CountDownLatch; 025 026import org.apache.avalon.framework.activity.Disposable; 027import org.apache.avalon.framework.activity.Initializable; 028import org.apache.avalon.framework.component.Component; 029import org.apache.avalon.framework.logger.AbstractLogEnabled; 030import org.apache.avalon.framework.logger.Logger; 031import org.apache.avalon.framework.service.ServiceException; 032import org.apache.avalon.framework.service.ServiceManager; 033import org.apache.avalon.framework.service.Serviceable; 034import org.apache.hc.client5.http.classic.HttpClient; 035import org.apache.hc.client5.http.classic.methods.HttpGet; 036import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; 037import org.apache.hc.core5.http.HttpEntity; 038import org.apache.hc.core5.io.CloseMode; 039 040import org.ametys.core.cache.AbstractCacheManager; 041import org.ametys.core.cache.Cache; 042import org.ametys.core.util.HttpUtils; 043import org.ametys.runtime.config.Config; 044import org.ametys.runtime.i18n.I18nizableText; 045 046import com.rometools.rome.feed.synd.SyndFeed; 047import com.rometools.rome.io.FeedException; 048import com.rometools.rome.io.SyndFeedInput; 049import com.rometools.rome.io.XmlReader; 050 051/** 052 * Feed cache, supports preloading multiple feeds in multiple concurrent threads. 053 */ 054public class FeedCache extends AbstractLogEnabled implements Component, Initializable, Serviceable, Disposable 055{ 056 /** The component role. */ 057 public static final String ROLE = FeedCache.class.getName(); 058 059 /** The feed cache id */ 060 protected static final String CACHE_ID = ROLE + "$feedCache"; 061 062 /** The cache manager */ 063 protected AbstractCacheManager _cacheManager; 064 /** The user information cache. */ 065 protected Cache<String, FeedResult> _cache; 066 067 private CloseableHttpClient _httpClient; 068 069 public void service(ServiceManager manager) throws ServiceException 070 { 071 _cacheManager = (AbstractCacheManager) manager.lookup(AbstractCacheManager.ROLE); 072 } 073 074 @Override 075 public void initialize() throws Exception 076 { 077 int timeoutValue = Config.getInstance().getValue("syndication.timeout", false, 2000L).intValue(); 078 _httpClient = HttpUtils.createHttpClient(-1, timeoutValue); 079 080 _cacheManager.createMemoryCache( 081 CACHE_ID, 082 new I18nizableText("plugin.syndication", "PLUGINS_SYNDICATION_FEED_CACHE_LABEL"), 083 new I18nizableText("plugin.syndication", "PLUGINS_SYNDICATION_FEED_CACHE_DESC"), 084 true, 085 Duration.ofDays(1)); 086 087 _cache = _cacheManager.get(CACHE_ID); 088 } 089 090 /** 091 * Pre-load a collection of feeds, one by thread, to avoid getting timeouts sequentially. 092 * @param feeds the feeds to preload. 093 */ 094 public void preload(Collection<String> feeds) 095 { 096 if (getLogger().isDebugEnabled()) 097 { 098 getLogger().debug("Preloading " + feeds.size() + " feeds..."); 099 } 100 101 long start = System.currentTimeMillis(); 102 103 // Synchronization helper. 104 CountDownLatch doneSignal = new CountDownLatch(feeds.size()); 105 106 for (String feed : feeds) 107 { 108 // Test if the feed is already in the cache. 109 if (!_cache.hasKey(feed)) 110 { 111 // The feed is not in the cache: prepare and launch a thread to retrieve it. 112 new Thread(() -> { 113 try 114 { 115 if (getLogger().isDebugEnabled()) 116 { 117 getLogger().debug("Preparing to load the URL '" + feed + "'"); 118 } 119 120 long tStart = System.currentTimeMillis(); 121 122 // Load the feed and signal that it's done. 123 _cache.get(feed, u -> FeedCache.loadFeed(u, _httpClient, getLogger())); 124 125 if (getLogger().isDebugEnabled()) 126 { 127 long tEnd = System.currentTimeMillis(); 128 getLogger().debug("URL '" + feed + "' was loaded successfully in " + (tEnd - tStart) + " millis."); 129 } 130 } 131 catch (Exception e) 132 { 133 getLogger().error("An error occurred loading the URL '" + feed + "'", e); 134 } 135 finally 136 { 137 // Signal that the worker has finished. 138 doneSignal.countDown(); 139 } 140 }).start(); 141 } 142 else 143 { 144 // The feed is already in the cache: count down. 145 doneSignal.countDown(); 146 } 147 } 148 149 try 150 { 151 // Wait for all the URLs to be loaded (i.e. all the threads to end). 152 doneSignal.await(); 153 154 if (getLogger().isDebugEnabled()) 155 { 156 long end = System.currentTimeMillis(); 157 getLogger().debug("Feed preloading ended in " + (end - start) + " millis."); 158 } 159 } 160 catch (InterruptedException e) 161 { 162 // Ignore, let the threads finish or die. 163 } 164 } 165 166 /** 167 * Get a feed not from the cache. 168 * @param feedUrl the feed. 169 * @return the feed result. 170 * @throws IOException if an error occurs while loading the feed 171 */ 172 public FeedResult getFeedNoCache(String feedUrl) throws IOException 173 { 174 return loadFeed(feedUrl, _httpClient, getLogger()); 175 } 176 177 /** 178 * Get a feed. 179 * @param feedUrl the feed. 180 * @param lifeTime the amount of date or time to be added to the field 181 * @return the feed response. 182 */ 183 public FeedResult getFeed(String feedUrl, int lifeTime) 184 { 185 FeedResult feedResult = _cache.get(feedUrl, u -> loadFeed(u, _httpClient, getLogger())); 186 ZonedDateTime dateFeed = feedResult.getCreationDate(); 187 188 if (ZonedDateTime.now().isAfter(dateFeed.plusMinutes(lifeTime))) 189 { 190 _cache.invalidate(feedUrl); 191 return _cache.get(feedUrl, u -> loadFeed(u, _httpClient, getLogger())); 192 } 193 else 194 { 195 return feedResult; 196 } 197 } 198 199 /** 200 * Retrieve a feed's content to store it into the cache. 201 * @param feedUrl the feed to load. 202 * @param httpClient the client to use to load the feed 203 * @param logger the logger 204 * @return the feed content. 205 */ 206 protected static FeedResult loadFeed(String feedUrl, HttpClient httpClient, Logger logger) 207 { 208 FeedResult result = new FeedResult(); 209 210 result.setCreationDate(ZonedDateTime.now()); 211 212 try 213 { 214 HttpGet feedSource = new HttpGet(feedUrl); 215 216 httpClient.execute(feedSource, response -> { 217 int statusCode = response.getCode(); 218 if (statusCode != 200) 219 { 220 result.setStatus(FeedResult.STATUS_ERROR); 221 result.setMessageError("Unable to join the RSS feed : " + feedUrl + ". HTTP response code is " + statusCode + "."); 222 logger.error("Unable to join the RSS feed : " + feedUrl + ". HTTP response code is " + statusCode + "."); 223 return result; 224 } 225 226 try (HttpEntity entity = response.getEntity(); 227 InputStream is = entity.getContent(); 228 Reader reader = new XmlReader(is)) 229 { 230 SyndFeedInput input = new SyndFeedInput(); 231 SyndFeed synFeed = input.build(reader); 232 result.setStatus(FeedResult.STATUS_OK); 233 result.setResponse(synFeed); 234 235 return result; 236 } 237 catch (IllegalArgumentException | FeedException e) 238 { 239 throw new IOException("Unable to parse the feed due to previous exception", e); 240 } 241 }); 242 243 } 244 catch (Exception e) 245 { 246 result.setStatus(FeedResult.STATUS_ERROR); 247 result.setMessageError(e.getLocalizedMessage()); 248 logger.error("Unable to read the RSS feed to the url : " + feedUrl, e); 249 } 250 251 return result; 252 } 253 254 public void dispose() 255 { 256 _httpClient.close(CloseMode.GRACEFUL); 257 } 258}