001/*
002 *  Copyright 2015 Anyware Services
003 *
004 *  Licensed under the Apache License, Version 2.0 (the "License");
005 *  you may not use this file except in compliance with the License.
006 *  You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 *  Unless required by applicable law or agreed to in writing, software
011 *  distributed under the License is distributed on an "AS IS" BASIS,
012 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 *  See the License for the specific language governing permissions and
014 *  limitations under the License.
015 */
016package org.ametys.plugins.syndication;
017
018import java.io.IOException;
019import java.io.InputStream;
020import java.io.Reader;
021import java.time.Duration;
022import java.time.ZonedDateTime;
023import java.util.Collection;
024import java.util.concurrent.CountDownLatch;
025
026import org.apache.avalon.framework.activity.Disposable;
027import org.apache.avalon.framework.activity.Initializable;
028import org.apache.avalon.framework.component.Component;
029import org.apache.avalon.framework.logger.AbstractLogEnabled;
030import org.apache.avalon.framework.logger.Logger;
031import org.apache.avalon.framework.service.ServiceException;
032import org.apache.avalon.framework.service.ServiceManager;
033import org.apache.avalon.framework.service.Serviceable;
034import org.apache.hc.client5.http.classic.HttpClient;
035import org.apache.hc.client5.http.classic.methods.HttpGet;
036import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
037import org.apache.hc.core5.http.HttpEntity;
038import org.apache.hc.core5.io.CloseMode;
039
040import org.ametys.core.cache.AbstractCacheManager;
041import org.ametys.core.cache.Cache;
042import org.ametys.core.util.HttpUtils;
043import org.ametys.runtime.config.Config;
044import org.ametys.runtime.i18n.I18nizableText;
045
046import com.rometools.rome.feed.synd.SyndFeed;
047import com.rometools.rome.io.FeedException;
048import com.rometools.rome.io.SyndFeedInput;
049import com.rometools.rome.io.XmlReader;
050
051/**
052 * Feed cache, supports preloading multiple feeds in multiple concurrent threads.
053 */
054public class FeedCache extends AbstractLogEnabled implements Component, Initializable, Serviceable, Disposable
055{
056    /** The component role. */
057    public static final String ROLE = FeedCache.class.getName();
058    
059    /** The feed cache id */
060    protected static final String CACHE_ID = ROLE + "$feedCache";
061    
062    /** The cache manager */
063    protected AbstractCacheManager _cacheManager;
064    /** The user information cache. */
065    protected Cache<String, FeedResult> _cache;
066    
067    private CloseableHttpClient _httpClient;
068    
069    public void service(ServiceManager manager) throws ServiceException
070    {
071        _cacheManager = (AbstractCacheManager) manager.lookup(AbstractCacheManager.ROLE);
072    }
073    
074    @Override
075    public void initialize() throws Exception
076    {
077        int timeoutValue = Config.getInstance().getValue("syndication.timeout", false, 2000L).intValue();
078        _httpClient = HttpUtils.createHttpClient(-1, timeoutValue);
079        
080        _cacheManager.createMemoryCache(
081                CACHE_ID,
082                new I18nizableText("plugin.syndication", "PLUGINS_SYNDICATION_FEED_CACHE_LABEL"),
083                new I18nizableText("plugin.syndication", "PLUGINS_SYNDICATION_FEED_CACHE_DESC"),
084                true,
085                Duration.ofDays(1));
086        
087        _cache = _cacheManager.get(CACHE_ID);
088    }
089    
090    /**
091     * Pre-load a collection of feeds, one by thread, to avoid getting timeouts sequentially.
092     * @param feeds the feeds to preload.
093     */
094    public void preload(Collection<String> feeds)
095    {
096        if (getLogger().isDebugEnabled())
097        {
098            getLogger().debug("Preloading " + feeds.size() + " feeds...");
099        }
100        
101        long start = System.currentTimeMillis();
102        
103        // Synchronization helper.
104        CountDownLatch doneSignal = new CountDownLatch(feeds.size());
105        
106        for (String feed : feeds)
107        {
108            // Test if the feed is already in the cache.
109            if (!_cache.hasKey(feed))
110            {
111                // The feed is not in the cache: prepare and launch a thread to retrieve it.
112                new Thread(() -> {
113                    try
114                    {
115                        if (getLogger().isDebugEnabled())
116                        {
117                            getLogger().debug("Preparing to load the URL '" + feed + "'");
118                        }
119                        
120                        long tStart = System.currentTimeMillis();
121                        
122                        // Load the feed and signal that it's done.
123                        _cache.get(feed, u -> FeedCache.loadFeed(u, _httpClient, getLogger()));
124                        
125                        if (getLogger().isDebugEnabled())
126                        {
127                            long tEnd = System.currentTimeMillis();
128                            getLogger().debug("URL '" + feed + "' was loaded successfully in " + (tEnd - tStart) + " millis.");
129                        }
130                    }
131                    catch (Exception e)
132                    {
133                        getLogger().error("An error occurred loading the URL '" + feed + "'", e);
134                    }
135                    finally
136                    {
137                        // Signal that the worker has finished.
138                        doneSignal.countDown();
139                    }
140                }).start();
141            }
142            else
143            {
144                // The feed is already in the cache: count down.
145                doneSignal.countDown();
146            }
147        }
148        
149        try
150        {
151            // Wait for all the URLs to be loaded (i.e. all the threads to end).
152            doneSignal.await();
153            
154            if (getLogger().isDebugEnabled())
155            {
156                long end = System.currentTimeMillis();
157                getLogger().debug("Feed preloading ended in " + (end - start) + " millis.");
158            }
159        }
160        catch (InterruptedException e)
161        {
162            // Ignore, let the threads finish or die.
163        }
164    }
165    
166    /**
167     * Get a feed not from the cache.
168     * @param feedUrl the feed.
169     * @return the feed result.
170     * @throws IOException if an error occurs while loading the feed
171     */
172    public FeedResult getFeedNoCache(String feedUrl) throws IOException
173    {
174        return loadFeed(feedUrl, _httpClient, getLogger());
175    }
176    
177    /**
178     * Get a feed.
179     * @param feedUrl the feed.
180     * @param lifeTime the amount of date or time to be added to the field
181     * @return the feed response.
182     */
183    public FeedResult getFeed(String feedUrl, int lifeTime)
184    {
185        FeedResult feedResult = _cache.get(feedUrl, u -> loadFeed(u, _httpClient, getLogger()));
186        ZonedDateTime dateFeed = feedResult.getCreationDate();
187        
188        if (ZonedDateTime.now().isAfter(dateFeed.plusMinutes(lifeTime)))
189        {
190            _cache.invalidate(feedUrl);
191            return _cache.get(feedUrl, u -> loadFeed(u, _httpClient, getLogger()));
192        }
193        else
194        {
195            return feedResult;
196        }
197    }
198    
199    /**
200     * Retrieve a feed's content to store it into the cache.
201     * @param feedUrl the feed to load.
202     * @param httpClient the client to use to load the feed
203     * @param logger the logger
204     * @return the feed content.
205     */
206    protected static FeedResult loadFeed(String feedUrl, HttpClient httpClient, Logger logger)
207    {
208        FeedResult result = new FeedResult();
209        
210        result.setCreationDate(ZonedDateTime.now());
211        
212        try
213        {
214            HttpGet feedSource = new HttpGet(feedUrl);
215            
216            httpClient.execute(feedSource, response -> {
217                int statusCode = response.getCode();
218                if (statusCode != 200)
219                {
220                    result.setStatus(FeedResult.STATUS_ERROR);
221                    result.setMessageError("Unable to join the RSS feed : " + feedUrl + ". HTTP response code is " + statusCode + ".");
222                    logger.error("Unable to join the RSS feed : " + feedUrl + ". HTTP response code is " + statusCode + ".");
223                    return result;
224                }
225                
226                try (HttpEntity entity = response.getEntity();
227                        InputStream is = entity.getContent();
228                        Reader reader = new XmlReader(is))
229                {
230                    SyndFeedInput input = new SyndFeedInput();
231                    SyndFeed synFeed = input.build(reader);
232                    result.setStatus(FeedResult.STATUS_OK);
233                    result.setResponse(synFeed);
234                    
235                    return result;
236                }
237                catch (IllegalArgumentException | FeedException e)
238                {
239                    throw new IOException("Unable to parse the feed due to previous exception", e);
240                }
241            });
242            
243        }
244        catch (Exception e)
245        {
246            result.setStatus(FeedResult.STATUS_ERROR);
247            result.setMessageError(e.getLocalizedMessage());
248            logger.error("Unable to read the RSS feed to the url : " + feedUrl, e);
249        }
250        
251        return result;
252    }
253    
254    public void dispose()
255    {
256        _httpClient.close(CloseMode.GRACEFUL);
257    }
258}