Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -891,22 +891,39 @@ public void makeDirs( String fileSystem, String path )
@Override
public List<Reclaim> listOrphanedFiles( int limit )
{
// timestamp data type is encoded as the number of milliseconds since epoch
Date cur = new Date();
int partition = getHoursInDay( cur );
long threshold = getReclaimThreshold( cur, config.getGCGracePeriodInHours() );
ResultSet result;
ArrayList<Reclaim> ret = new ArrayList<>();
String baseQuery = "SELECT * FROM " + keyspace + ".reclaim WHERE partition = ? AND deletion < ?";
if ( limit > 0 )
{
result = session.execute( baseQuery + " limit ?;", partition, threshold, limit );
}
else
PreparedStatement stmt = session.prepare( baseQuery + ( limit > 0 ? " limit ?" : "" ) + ";" );

for ( int partition = 0; partition < 24; partition++ )
{
result = session.execute( baseQuery + ";", partition, threshold );
try
{
BoundStatement bound;
Comment thread
jdcasey marked this conversation as resolved.
if ( limit > 0 )
{
int remaining = limit - ret.size();
if ( remaining <= 0 )
{
break;
}
bound = stmt.bind( partition, threshold, remaining );
}
else
{
bound = stmt.bind( partition, threshold );
}
ResultSet result = executeSession( bound );
Result<DtxReclaim> dtxReclaims = reclaimMapper.map( result );
ret.addAll( dtxReclaims.all() );
}
catch ( Exception e )
{
logger.warn( "Failed to query reclaim partition {}: {}", partition, e.getMessage() );
}
}
Result<DtxReclaim> dtxReclaims = reclaimMapper.map( result );
ArrayList<Reclaim> ret = new ArrayList<>( dtxReclaims.all() );
logger.info( "List orphaned files, cur: {}, threshold: {}, limit: {}, size: {}", cur, new Date( threshold ), limit, ret.size() );
return ret;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static String getSchemaCreateTablePathmap( String keyspace )
+ "filestorage varchar,"
+ "checksum varchar,"
+ "PRIMARY KEY ((filesystem, parentpath), filename)"
+ ");";
+ ") WITH compaction = {'class': 'LeveledCompactionStrategy', 'sstable_size_in_mb': '160'};";
}

public static String getSchemaCreateTableReversemap( String keyspace )
Expand All @@ -79,7 +79,7 @@ public static String getSchemaCreateTableReversemap( String keyspace )
+ "fileid varchar,"
+ "paths set<text>,"
+ "PRIMARY KEY (fileid)"
+ ");";
+ ") WITH compaction = {'class': 'LeveledCompactionStrategy', 'sstable_size_in_mb': '160'};";
}

public static String getSchemaCreateTableReclaim( String keyspace )
Expand All @@ -91,7 +91,8 @@ public static String getSchemaCreateTableReclaim( String keyspace )
+ "checksum varchar,"
+ "storage varchar,"
+ "PRIMARY KEY (partition, deletion, fileid)"
+ ");";
+ ") WITH compaction = {'class': 'TimeWindowCompactionStrategy', 'compaction_window_unit': 'HOURS', 'compaction_window_size': '4'}"
+ " AND gc_grace_seconds = 14400;";
}

public static String getSchemaCreateTableFileChecksum( String keyspace )
Expand All @@ -101,7 +102,7 @@ public static String getSchemaCreateTableFileChecksum( String keyspace )
+ "fileid varchar,"
+ "storage varchar,"
+ "PRIMARY KEY (checksum)"
+ ");";
+ ") WITH compaction = {'class': 'LeveledCompactionStrategy', 'sstable_size_in_mb': '160'};";
}

// Since Date.getHours is deprecated, we use this to replace it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class PathMappedFileManager implements Closeable

private String deduplicatePattern;

private int consecutiveGcFailures = 0;

public PathMappedFileManager( PathMappedStorageConfig config, PathDB pathDB, PhysicalStore physicalStore )
{
this.pathDB = pathDB;
Expand Down Expand Up @@ -388,11 +390,23 @@ public Map<FileInfo, Boolean> gc()
{
try
{
return executeGC();
Map<FileInfo, Boolean> result = executeGC();
consecutiveGcFailures = 0;
return result;
}
catch (Exception e)
{
logger.warn("Storage gc hit a problem but will continue to execute next time", e);
consecutiveGcFailures++;
if ( consecutiveGcFailures >= 3 )
{
logger.error( "Storage gc failed {} consecutive times. Last error: {} - {}. Manual intervention may be required.",
consecutiveGcFailures, e.getClass().getSimpleName(), e.getMessage(), e );
}
else
{
logger.warn( "Storage gc hit a problem (failure {} of 3 before escalation): {} - {}",
consecutiveGcFailures, e.getClass().getSimpleName(), e.getMessage(), e );
}
}
return emptyMap();
}
Expand Down
Loading