diff --git a/brouter-core/src/main/java/btools/router/ProfileCache.java b/brouter-core/src/main/java/btools/router/ProfileCache.java index 4115004..8ff9e2e 100644 --- a/brouter-core/src/main/java/btools/router/ProfileCache.java +++ b/brouter-core/src/main/java/btools/router/ProfileCache.java @@ -13,16 +13,24 @@ import btools.expressions.BExpressionMetaData; public final class ProfileCache { - private static BExpressionContextWay expctxWay; - private static BExpressionContextNode expctxNode; private static File lastLookupFile; - private static File lastProfileFile; - private static long lastLookupTimestamp; - private static long lastProfileTimestamp; - - private static boolean profilesBusy; + + private BExpressionContextWay expctxWay; + private BExpressionContextNode expctxNode; + private File lastProfileFile; + private long lastProfileTimestamp; + private boolean profilesBusy; + private long lastUseTime; + + private static ProfileCache[] apc = new ProfileCache[1]; + private static boolean debug = Boolean.getBoolean( "debugProfileCache" ); + + public static synchronized void setSize( int size ) + { + apc = new ProfileCache[size]; + } public static synchronized boolean parseProfile( RoutingContext rc ) { @@ -42,21 +50,51 @@ public final class ProfileCache rc.profileTimestamp = profileFile.lastModified() + rc.getKeyValueChecksum()<<24; File lookupFile = new File( profileDir, "lookups.dat" ); + + // invalidate cache at lookup-table update + if ( !(lookupFile.equals( lastLookupFile ) && lookupFile.lastModified() == lastLookupTimestamp ) ) + { + if ( lastLookupFile != null ) + { + System.out.println( "******** invalidating profile-cache after lookup-file update ******** " ); + } + apc = new ProfileCache[apc.length]; + lastLookupFile = lookupFile; + lastLookupTimestamp = lookupFile.lastModified(); + } + + ProfileCache lru = null; + int unusedSlot =-1; // check for re-use - if ( expctxWay != null && expctxNode != null && !profilesBusy ) + for( int i=0; i we overide this one + unusedSlot = -1; + break; } + if ( lru == null || lru.lastUseTime > pc.lastUseTime ) + { + lru = pc; + } + } + else if ( unusedSlot < 0 ) + { + unusedSlot = i; } } @@ -78,22 +116,45 @@ public final class ProfileCache rc.expctxWay.setAllTagsUsed(); } - lastProfileTimestamp = rc.profileTimestamp; - lastLookupTimestamp = lookupFile.lastModified(); - lastProfileFile = profileFile; - lastLookupFile = lookupFile; - expctxWay = rc.expctxWay; - expctxNode = rc.expctxNode; - profilesBusy = true; + if ( lru == null || unusedSlot >= 0 ) + { + lru = new ProfileCache(); + if ( unusedSlot >= 0 ) + { + apc[unusedSlot] = lru; + if ( debug ) System.out.println( "******* adding new profile at idx=" + unusedSlot + " for " + profileFile ); + } + } + + if ( lru.lastProfileFile != null ) + { + if ( debug ) System.out.println( "******* replacing profile of age " + ((System.currentTimeMillis()-lru.lastUseTime)/1000L) + " sec " + lru.lastProfileFile + "->" + profileFile ); + } + + lru.lastProfileTimestamp = rc.profileTimestamp; + lru.lastProfileFile = profileFile; + lru.expctxWay = rc.expctxWay; + lru.expctxNode = rc.expctxNode; + lru.profilesBusy = true; + lru.lastUseTime = System.currentTimeMillis(); return false; } public static synchronized void releaseProfile( RoutingContext rc ) { - // only the thread that holds the cached instance can release it - if ( rc.expctxWay == expctxWay && rc.expctxNode == expctxNode ) + for( int i=0; i { public static final String PROFILE_UPLOAD_URL = "/brouter/profile"; static final String HTTP_STATUS_OK = "200 OK"; @@ -47,8 +48,10 @@ public class RouteServer extends Thread private Socket clientSocket = null; private RoutingEngine cr = null; private volatile boolean terminated; + private long starttime; private static Object threadPoolSync = new Object(); + private static boolean debug = Boolean.getBoolean( "debugThreadPool" ); public void stopRouter() { @@ -300,8 +303,10 @@ public class RouteServer extends Thread serviceContext.sharedProfileDir = tk.hasMoreTokens() ? tk.nextToken() : serviceContext.customProfileDir; int maxthreads = Integer.parseInt( args[4] ); + + ProfileCache.setSize( 2*maxthreads ); - TreeMap threadMap = new TreeMap(); + PriorityQueue threadQueue = new PriorityQueue(); ServerSocket serverSocket = args.length > 5 ? new ServerSocket(Integer.parseInt(args[3]),100,InetAddress.getByName(args[5])) : new ServerSocket(Integer.parseInt(args[3])); @@ -317,43 +322,47 @@ public class RouteServer extends Thread System.out.println( "*** sampling stacks into stacks.txt *** "); } - long last_ts = 0; for (;;) { Socket clientSocket = serverSocket.accept(); RouteServer server = new RouteServer(); server.serviceContext = serviceContext; server.clientSocket = clientSocket; + server.starttime = System.currentTimeMillis(); // kill an old thread if thread limit reached - cleanupThreadList( threadMap ); - - if ( threadMap.size() >= maxthreads ) + cleanupThreadQueue( threadQueue ); + + + if ( debug ) System.out.println( "threadQueue.size()=" + threadQueue.size() ); + if ( threadQueue.size() >= maxthreads ) { synchronized( threadPoolSync ) { // wait up to 2000ms (maybe notified earlier) // to prevent killing short-running threads - threadPoolSync.wait( 2000 ); + long maxage = server.starttime - threadQueue.peek().starttime; + long maxWaitTime = 2000L-maxage; + if ( debug ) System.out.println( "maxage=" + maxage + " maxWaitTime=" + maxWaitTime ); + if ( maxWaitTime > 0 ) + { + threadPoolSync.wait( maxWaitTime ); + } } - cleanupThreadList( threadMap ); - if ( threadMap.size() >= maxthreads ) + cleanupThreadQueue( threadQueue ); + if ( threadQueue.size() >= maxthreads ) { + if ( debug ) System.out.println( "stopping oldest thread..." ); // no way... stop the oldest thread - Long k = threadMap.firstKey(); - RouteServer victim = threadMap.get( k ); - threadMap.remove( k ); - victim.stopRouter(); + threadQueue.poll().stopRouter(); } } - long ts = System.currentTimeMillis(); - while ( ts <= last_ts ) ts++; - threadMap.put( Long.valueOf( ts ), server ); - last_ts = ts; + threadQueue.add( server ); server.start(); + if ( debug ) System.out.println( "thread started..." ); } } @@ -423,16 +432,16 @@ public class RouteServer extends Thread bw.write( "\n" ); } - private static void cleanupThreadList( TreeMap threadMap ) + private static void cleanupThreadQueue( PriorityQueue threadQueue ) { for ( ;; ) { boolean removedItem = false; - for ( Map.Entry e : threadMap.entrySet() ) + for ( RouteServer t : threadQueue ) { - if ( e.getValue().terminated ) + if ( t.terminated ) { - threadMap.remove( e.getKey() ); + threadQueue.remove( t ); removedItem = true; break; } @@ -443,4 +452,11 @@ public class RouteServer extends Thread } } } + + @Override + public int compareTo( RouteServer t ) + { + return starttime < t.starttime ? -1 : ( starttime > t.starttime ? 1 : 0 ); + } + }