WayLinker 2-threaded

This commit is contained in:
Arndt Brenschede 2019-09-28 21:29:14 +02:00
parent 231f55c64d
commit dbf1479da4
2 changed files with 136 additions and 4 deletions

View file

@ -16,6 +16,7 @@ public class WayIterator extends MapCreatorBase
{
private WayListener listener;
private boolean delete;
private boolean descendingSize;
public WayIterator( WayListener wayListener, boolean deleteAfterReading )
{
@ -23,6 +24,12 @@ public class WayIterator extends MapCreatorBase
delete = deleteAfterReading;
}
public WayIterator( WayListener wayListener, boolean deleteAfterReading, boolean descendingSize )
{
this( wayListener, deleteAfterReading );
this.descendingSize = descendingSize;
}
public void processDir( File indir, String inSuffix ) throws Exception
{
if ( !indir.isDirectory() )
@ -33,7 +40,7 @@ public class WayIterator extends MapCreatorBase
File[] af = sortBySizeAsc( indir.listFiles() );
for( int i=0; i<af.length; i++ )
{
File wayfile = af[i];
File wayfile = descendingSize ? af[af.length -1 - i] : af[i];
if ( wayfile.getName().endsWith( inSuffix ) )
{
processFile( wayfile );

View file

@ -32,9 +32,10 @@ import btools.util.LazyArrayOfLists;
*
* @author ab
*/
public class WayLinker extends MapCreatorBase
public class WayLinker extends MapCreatorBase implements Runnable
{
private File nodeTilesIn;
private File wayTilesIn;
private File trafficTilesIn;
private File dataTilesOut;
private File borderFileIn;
@ -66,6 +67,57 @@ public class WayLinker extends MapCreatorBase
private boolean skipEncodingCheck;
private boolean isSlave;
private ThreadController tc;
public static final class ThreadController
{
long maxFileSize = 0L;
long currentSlaveSize;
long currentMasterSize;
synchronized boolean setCurrentMasterSize( long size )
{
try
{
if ( size <= currentSlaveSize )
{
maxFileSize = Long.MAX_VALUE;
return false;
}
currentMasterSize = size;
if ( maxFileSize == 0L )
{
maxFileSize = size;
}
return true;
}
finally
{
notify();
}
}
synchronized boolean setCurrentSlaveSize( long size ) throws Exception
{
if ( size >= currentMasterSize )
{
return false;
}
while ( size + currentMasterSize + 50000000L > maxFileSize )
{
System.out.println( "****** slave thread waiting for permission to process file of size " + size
+ " currentMaster=" + currentMasterSize + " maxFileSize=" + maxFileSize );
wait( 10000 );
}
currentSlaveSize = size;
return true;
}
}
private void reset()
{
minLon = -1;
@ -83,14 +135,39 @@ public class WayLinker extends MapCreatorBase
.println( "usage: java WayLinker <node-tiles-in> <way-tiles-in> <bordernodes> <restrictions> <lookup-file> <profile-file> <data-tiles-out> <data-tiles-suffix> " );
return;
}
new WayLinker().process( new File( args[0] ), new File( args[1] ), new File( args[2] ), new File( args[3] ), new File( args[4] ), new File( args[5] ), new File(
args[6] ), args[7] );
}
public void process( File nodeTilesIn, File wayTilesIn, File borderFileIn, File restrictionsFileIn, File lookupFile, File profileFile, File dataTilesOut,
String dataTilesSuffix ) throws Exception
{
WayLinker master = new WayLinker();
WayLinker slave = new WayLinker();
slave.isSlave = true;
master.isSlave = false;
ThreadController tc = new ThreadController();
slave.tc = tc;
master.tc = tc;
master._process( nodeTilesIn, wayTilesIn, borderFileIn, restrictionsFileIn, lookupFile, profileFile, dataTilesOut, dataTilesSuffix );
slave._process( nodeTilesIn, wayTilesIn, borderFileIn, restrictionsFileIn, lookupFile, profileFile, dataTilesOut, dataTilesSuffix );
Thread m = new Thread( master );
Thread s = new Thread( slave );
m.start();
s.start();
m.join();
s.join();
}
private void _process( File nodeTilesIn, File wayTilesIn, File borderFileIn, File restrictionsFileIn, File lookupFile, File profileFile, File dataTilesOut,
String dataTilesSuffix ) throws Exception
{
this.nodeTilesIn = nodeTilesIn;
this.wayTilesIn = wayTilesIn;
this.trafficTilesIn = new File( "traffic" );
this.dataTilesOut = dataTilesOut;
this.borderFileIn = borderFileIn;
@ -114,13 +191,61 @@ public class WayLinker extends MapCreatorBase
skipEncodingCheck = Boolean.getBoolean( "skipEncodingCheck" );
// then process all segments
new WayIterator( this, true ).processDir( wayTilesIn, ".wt5" );
}
@Override
public void run()
{
try
{
// then process all segments
new WayIterator( this, true, !isSlave ).processDir( wayTilesIn, ".wt5" );
}
catch( Exception e )
{
System.out.println( "******* thread (slave=" + isSlave + ") got Exception: " + e );
throw new RuntimeException( e );
}
finally
{
if (!isSlave)
{
tc.setCurrentMasterSize( 0L );
}
}
}
@Override
public boolean wayFileStart( File wayfile ) throws Exception
{
// master/slave logic:
// total memory size should stay below a maximum
// and no file should be processed twice
long filesize = wayfile.length();
System.out.println( "**** wayFileStart() for isSlave=" + isSlave + " size=" + filesize );
if ( isSlave )
{
if ( !tc.setCurrentSlaveSize( filesize ) )
{
return false;
}
}
else
{
if ( !tc.setCurrentMasterSize( filesize ) )
{
return false;
}
}
File trafficFile = fileFromTemplate( wayfile, trafficTilesIn, "trf" );
if ( trafficTilesIn.isDirectory() && !trafficFile.exists() )
{