diff --git a/pyproject.toml b/pyproject.toml index 1fa8d81f..1e9066cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ dependencies = [ 'pandas', 'boto3', 's3fs', + 'tqdm', ] [project.urls] diff --git a/src/claimed/components/util/cosutils.py b/src/claimed/components/util/cosutils.py index 9181e16d..8dee5445 100644 --- a/src/claimed/components/util/cosutils.py +++ b/src/claimed/components/util/cosutils.py @@ -18,13 +18,59 @@ import logging +import math import os import re import s3fs import sys import glob +from tqdm import tqdm from claimed.c3.operator_utils import explode_connection_string +MIN_CHUNK_SIZE = 8 * 1024 * 1024 # 8 MiB +MAX_PARTS = 9500 # S3 hard limit is 10 000; stay safely below + + +def _upload(s3, local_file, cos_file): + """Upload a single file to S3/COS with a byte-level progress bar. + + Chunk size is computed dynamically so the number of multipart parts + never exceeds the S3/COS limit of 10 000. + """ + # If cos_file is a bucket root or ends with '/', treat it as a directory prefix + if cos_file == '' or cos_file.endswith('/') or '/' not in cos_file: + cos_file = cos_file.rstrip('/') + '/' + os.path.basename(local_file) + size = os.path.getsize(local_file) + # Ensure chunk size is large enough to stay within the 10 000-part limit + chunk_size = max(MIN_CHUNK_SIZE, math.ceil(size / MAX_PARTS)) + desc = os.path.basename(local_file) + with tqdm(total=size, unit='B', unit_scale=True, unit_divisor=1024, + desc=f'↑ {desc}', leave=True) as pbar: + with open(local_file, 'rb') as f_in, \ + s3.open(cos_file, 'wb', block_size=chunk_size) as f_out: + while True: + chunk = f_in.read(chunk_size) + if not chunk: + break + f_out.write(chunk) + pbar.update(len(chunk)) + + +def _download(s3, cos_file, local_file): + """Download a single file from S3/COS with a byte-level progress bar.""" + os.makedirs(os.path.dirname(local_file) or '.', exist_ok=True) + size = s3.info(cos_file)['size'] + desc = os.path.basename(cos_file) + with tqdm(total=size, unit='B', unit_scale=True, unit_divisor=1024, + desc=f'↓ {desc}', leave=True) as pbar: + with s3.open(cos_file, 'rb') as f_in, open(local_file, 'wb') as f_out: + while True: + chunk = f_in.read(CHUNK_SIZE) + if not chunk: + break + f_out.write(chunk) + pbar.update(len(chunk)) + # In[ ]: @@ -74,52 +120,86 @@ def run( if operation == 'mkdir': s3.mkdir(cos_path) + elif operation == 'ls': print(s3.ls(cos_path)) + elif operation == 'find': print(s3.find(cos_path)) + elif operation == 'put': - print(s3.put(local_path,cos_path, recursive=recursive)) + if recursive or os.path.isdir(local_path): + # gather all files under local_path + files = [f for f in glob.glob( + os.path.join(local_path, '**'), recursive=True) + if os.path.isfile(f)] + with tqdm(files, unit='file', desc='Uploading') as pbar: + for f in pbar: + rel = os.path.relpath(f, local_path) + pbar.set_postfix_str(rel) + _upload(s3, f, cos_path.rstrip('/') + '/' + rel) + else: + _upload(s3, local_path, cos_path) + elif operation == 'sync_to_cos': - for file in glob.glob(local_path, recursive=recursive): - logging.info(f'processing {file}') - if s3.exists(cos_path+file): - logging.info(f'exists {file}') - logging.debug(f's3.info {s3.info(cos_path+file)}') - if s3.info(cos_path+file)['size'] != os.path.getsize(file): - logging.info(f'uploading {file}') - s3.put(file, cos_path+file) + files = glob.glob(local_path, recursive=recursive) + with tqdm(files, unit='file', desc='Syncing → COS') as pbar: + for file in pbar: + pbar.set_postfix_str(file) + logging.info(f'processing {file}') + if s3.exists(cos_path + file): + logging.debug(f's3.info {s3.info(cos_path + file)}') + if s3.info(cos_path + file)['size'] != os.path.getsize(file): + logging.info(f'uploading {file}') + _upload(s3, file, cos_path + file) + else: + logging.info(f'skipping {file}') else: - logging.info(f'skipping {file}') - else: - logging.info(f'uploading {file}') - s3.put(file, cos_path+file) + logging.info(f'uploading {file}') + _upload(s3, file, cos_path + file) + elif operation == 'sync_to_local': - for full_path in s3.glob(cos_path): - local_full_path = local_path+full_path - logging.info(f'processing {full_path}') - if s3.info(full_path)['type'] == 'directory': - logging.debug(f'skipping directory {full_path}') - continue - if os.path.exists(local_full_path): - logging.info(f'exists {full_path}') - logging.debug(f's3.info {s3.info(full_path)}') - if s3.info(full_path)['size'] != os.path.getsize(local_full_path): - logging.info(f'downloading {full_path} to {local_full_path}') - s3.get(full_path, local_full_path) + remote_files = [p for p in s3.glob(cos_path) + if s3.info(p)['type'] != 'directory'] + with tqdm(remote_files, unit='file', desc='Syncing → local') as pbar: + for full_path in pbar: + local_full_path = local_path + full_path + pbar.set_postfix_str(os.path.basename(full_path)) + logging.info(f'processing {full_path}') + if os.path.exists(local_full_path): + if s3.info(full_path)['size'] != os.path.getsize(local_full_path): + logging.info(f'downloading {full_path} to {local_full_path}') + _download(s3, full_path, local_full_path) + else: + logging.info(f'skipping {full_path}') else: - logging.info(f'skipping {full_path}') - else: - logging.info(f'downloading {full_path} to {local_full_path}') - s3.get(full_path, local_full_path) + logging.info(f'downloading {full_path} to {local_full_path}') + _download(s3, full_path, local_full_path) + elif operation == 'get': - s3.get(cos_path, local_path, recursive=recursive) + if recursive: + remote_files = [p for p in s3.find(cos_path) + if s3.info(p)['type'] != 'directory'] + with tqdm(remote_files, unit='file', desc='Downloading') as pbar: + for rp in pbar: + rel = rp[len(cos_path):].lstrip('/') + lp = os.path.join(local_path, rel) + pbar.set_postfix_str(os.path.basename(rp)) + _download(s3, rp, lp) + else: + dest = local_path + if os.path.isdir(local_path): + dest = os.path.join(local_path, os.path.basename(cos_path)) + _download(s3, cos_path, dest) + elif operation == 'rm': s3.rm(cos_path, recursive=recursive) + elif operation == 'glob': print(s3.glob(cos_path)) + else: - logging.error(f'operation unkonwn {operation}') + logging.error(f'operation unknown: {operation}') # In[ ]: diff --git a/src/claimed/components/util/gpu_performance_test.py b/src/claimed/components/util/gpu_performance_test.py index 2af0a811..c5e5ea97 100644 --- a/src/claimed/components/util/gpu_performance_test.py +++ b/src/claimed/components/util/gpu_performance_test.py @@ -27,11 +27,11 @@ python pytorch_hpc_benchmark.py --mode cpu """ +import math import os import time import argparse import shutil -import math import random import torch @@ -40,6 +40,21 @@ import torch.distributed as dist from torch.utils.data import Dataset, DataLoader +# ===================== +# Helpers +# ===================== + +def _parse_dim(val): + """Parse an int or comma-separated ints into an int or tuple. + + Examples: '1024' -> 1024, '3,224,224' -> (3, 224, 224) + """ + if isinstance(val, (int, tuple)): + return val + parts = [int(p) for p in str(val).split(',')] + return parts[0] if len(parts) == 1 else tuple(parts) + + # ===================== # Synthetic Dataset # ===================== @@ -58,7 +73,8 @@ def __len__(self): return self.size def _generate(self, idx): - x = torch.randn(self.input_dim) + shape = (self.input_dim,) if isinstance(self.input_dim, int) else self.input_dim + x = torch.randn(*shape) y = torch.randint(0, self.num_classes, (1,)).item() return x, y @@ -81,8 +97,11 @@ def __getitem__(self, idx): class SimpleMLP(nn.Module): def __init__(self, input_dim, hidden_dim, num_classes, depth=3): super().__init__() + flat = input_dim if isinstance(input_dim, int) else math.prod(input_dim) layers = [] - dim = input_dim + if not isinstance(input_dim, int): + layers.append(nn.Flatten()) + dim = flat for _ in range(depth): layers.append(nn.Linear(dim, hidden_dim)) layers.append(nn.ReLU()) @@ -211,7 +230,7 @@ def run( num_workers: int = 4, dataset_size: int = 100000, steps: int = 100, - input_dim: int = 1024, + input_dim: str = '1024', hidden_dim: int = 2048, num_classes: int = 10, depth: int = 3, @@ -228,7 +247,7 @@ def run( num_workers: dataloader worker processes dataset_size: total number of synthetic samples steps: number of batches per benchmark phase - input_dim: input feature dimension of the MLP + input_dim: input feature dimension – single int or C,H,W tuple (e.g. '1024' or '3,224,224') hidden_dim: hidden layer width of the MLP num_classes: number of output classes depth: number of hidden layers @@ -237,6 +256,7 @@ def run( matrix_size: square matrix edge length for compute benchmarks iterations: number of matrix-multiply iterations for compute benchmarks """ + input_dim = _parse_dim(input_dim) if mode == 'cpu': print('CPU GFLOPS:', benchmark_cpu(matrix_size, iterations)) return @@ -296,7 +316,7 @@ def main(): parser.add_argument('--num_workers', type=int, default=4) parser.add_argument('--dataset_size', type=int, default=100000) parser.add_argument('--steps', type=int, default=100) - parser.add_argument('--input_dim', type=int, default=1024) + parser.add_argument('--input_dim', type=str, default='1024') parser.add_argument('--hidden_dim', type=int, default=2048) parser.add_argument('--num_classes', type=int, default=10) parser.add_argument('--depth', type=int, default=3)