Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies = [
'pandas',
'boto3',
's3fs',
'tqdm',
]

[project.urls]
Expand Down
142 changes: 111 additions & 31 deletions src/claimed/components/util/cosutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,59 @@


import logging
import math
import os
import re
import s3fs
import sys
import glob
from tqdm import tqdm
from claimed.c3.operator_utils import explode_connection_string

MIN_CHUNK_SIZE = 8 * 1024 * 1024 # 8 MiB
MAX_PARTS = 9500 # S3 hard limit is 10 000; stay safely below


def _upload(s3, local_file, cos_file):
"""Upload a single file to S3/COS with a byte-level progress bar.

Chunk size is computed dynamically so the number of multipart parts
never exceeds the S3/COS limit of 10 000.
"""
# If cos_file is a bucket root or ends with '/', treat it as a directory prefix
if cos_file == '' or cos_file.endswith('/') or '/' not in cos_file:
cos_file = cos_file.rstrip('/') + '/' + os.path.basename(local_file)
size = os.path.getsize(local_file)
# Ensure chunk size is large enough to stay within the 10 000-part limit
chunk_size = max(MIN_CHUNK_SIZE, math.ceil(size / MAX_PARTS))
desc = os.path.basename(local_file)
with tqdm(total=size, unit='B', unit_scale=True, unit_divisor=1024,
desc=f'↑ {desc}', leave=True) as pbar:
with open(local_file, 'rb') as f_in, \
s3.open(cos_file, 'wb', block_size=chunk_size) as f_out:
while True:
chunk = f_in.read(chunk_size)
if not chunk:
break
f_out.write(chunk)
pbar.update(len(chunk))


def _download(s3, cos_file, local_file):
"""Download a single file from S3/COS with a byte-level progress bar."""
os.makedirs(os.path.dirname(local_file) or '.', exist_ok=True)
size = s3.info(cos_file)['size']
desc = os.path.basename(cos_file)
with tqdm(total=size, unit='B', unit_scale=True, unit_divisor=1024,
desc=f'↓ {desc}', leave=True) as pbar:
with s3.open(cos_file, 'rb') as f_in, open(local_file, 'wb') as f_out:
while True:
chunk = f_in.read(CHUNK_SIZE)
if not chunk:
break
f_out.write(chunk)
pbar.update(len(chunk))

# In[ ]:


Expand Down Expand Up @@ -74,52 +120,86 @@ def run(

if operation == 'mkdir':
s3.mkdir(cos_path)

elif operation == 'ls':
print(s3.ls(cos_path))

elif operation == 'find':
print(s3.find(cos_path))

elif operation == 'put':
print(s3.put(local_path,cos_path, recursive=recursive))
if recursive or os.path.isdir(local_path):
# gather all files under local_path
files = [f for f in glob.glob(
os.path.join(local_path, '**'), recursive=True)
if os.path.isfile(f)]
with tqdm(files, unit='file', desc='Uploading') as pbar:
for f in pbar:
rel = os.path.relpath(f, local_path)
pbar.set_postfix_str(rel)
_upload(s3, f, cos_path.rstrip('/') + '/' + rel)
else:
_upload(s3, local_path, cos_path)

elif operation == 'sync_to_cos':
for file in glob.glob(local_path, recursive=recursive):
logging.info(f'processing {file}')
if s3.exists(cos_path+file):
logging.info(f'exists {file}')
logging.debug(f's3.info {s3.info(cos_path+file)}')
if s3.info(cos_path+file)['size'] != os.path.getsize(file):
logging.info(f'uploading {file}')
s3.put(file, cos_path+file)
files = glob.glob(local_path, recursive=recursive)
with tqdm(files, unit='file', desc='Syncing → COS') as pbar:
for file in pbar:
pbar.set_postfix_str(file)
logging.info(f'processing {file}')
if s3.exists(cos_path + file):
logging.debug(f's3.info {s3.info(cos_path + file)}')
if s3.info(cos_path + file)['size'] != os.path.getsize(file):
logging.info(f'uploading {file}')
_upload(s3, file, cos_path + file)
else:
logging.info(f'skipping {file}')
else:
logging.info(f'skipping {file}')
else:
logging.info(f'uploading {file}')
s3.put(file, cos_path+file)
logging.info(f'uploading {file}')
_upload(s3, file, cos_path + file)

elif operation == 'sync_to_local':
for full_path in s3.glob(cos_path):
local_full_path = local_path+full_path
logging.info(f'processing {full_path}')
if s3.info(full_path)['type'] == 'directory':
logging.debug(f'skipping directory {full_path}')
continue
if os.path.exists(local_full_path):
logging.info(f'exists {full_path}')
logging.debug(f's3.info {s3.info(full_path)}')
if s3.info(full_path)['size'] != os.path.getsize(local_full_path):
logging.info(f'downloading {full_path} to {local_full_path}')
s3.get(full_path, local_full_path)
remote_files = [p for p in s3.glob(cos_path)
if s3.info(p)['type'] != 'directory']
with tqdm(remote_files, unit='file', desc='Syncing → local') as pbar:
for full_path in pbar:
local_full_path = local_path + full_path
pbar.set_postfix_str(os.path.basename(full_path))
logging.info(f'processing {full_path}')
if os.path.exists(local_full_path):
if s3.info(full_path)['size'] != os.path.getsize(local_full_path):
logging.info(f'downloading {full_path} to {local_full_path}')
_download(s3, full_path, local_full_path)
else:
logging.info(f'skipping {full_path}')
else:
logging.info(f'skipping {full_path}')
else:
logging.info(f'downloading {full_path} to {local_full_path}')
s3.get(full_path, local_full_path)
logging.info(f'downloading {full_path} to {local_full_path}')
_download(s3, full_path, local_full_path)

elif operation == 'get':
s3.get(cos_path, local_path, recursive=recursive)
if recursive:
remote_files = [p for p in s3.find(cos_path)
if s3.info(p)['type'] != 'directory']
with tqdm(remote_files, unit='file', desc='Downloading') as pbar:
for rp in pbar:
rel = rp[len(cos_path):].lstrip('/')
lp = os.path.join(local_path, rel)
pbar.set_postfix_str(os.path.basename(rp))
_download(s3, rp, lp)
else:
dest = local_path
if os.path.isdir(local_path):
dest = os.path.join(local_path, os.path.basename(cos_path))
_download(s3, cos_path, dest)

elif operation == 'rm':
s3.rm(cos_path, recursive=recursive)

elif operation == 'glob':
print(s3.glob(cos_path))

else:
logging.error(f'operation unkonwn {operation}')
logging.error(f'operation unknown: {operation}')

# In[ ]:

Expand Down
32 changes: 26 additions & 6 deletions src/claimed/components/util/gpu_performance_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
python pytorch_hpc_benchmark.py --mode cpu
"""

import math
import os
import time
import argparse
import shutil
import math
import random

import torch
Expand All @@ -40,6 +40,21 @@
import torch.distributed as dist
from torch.utils.data import Dataset, DataLoader

# =====================
# Helpers
# =====================

def _parse_dim(val):
"""Parse an int or comma-separated ints into an int or tuple.

Examples: '1024' -> 1024, '3,224,224' -> (3, 224, 224)
"""
if isinstance(val, (int, tuple)):
return val
parts = [int(p) for p in str(val).split(',')]
return parts[0] if len(parts) == 1 else tuple(parts)


# =====================
# Synthetic Dataset
# =====================
Expand All @@ -58,7 +73,8 @@ def __len__(self):
return self.size

def _generate(self, idx):
x = torch.randn(self.input_dim)
shape = (self.input_dim,) if isinstance(self.input_dim, int) else self.input_dim
x = torch.randn(*shape)
y = torch.randint(0, self.num_classes, (1,)).item()
return x, y

Expand All @@ -81,8 +97,11 @@ def __getitem__(self, idx):
class SimpleMLP(nn.Module):
def __init__(self, input_dim, hidden_dim, num_classes, depth=3):
super().__init__()
flat = input_dim if isinstance(input_dim, int) else math.prod(input_dim)
layers = []
dim = input_dim
if not isinstance(input_dim, int):
layers.append(nn.Flatten())
dim = flat
for _ in range(depth):
layers.append(nn.Linear(dim, hidden_dim))
layers.append(nn.ReLU())
Expand Down Expand Up @@ -211,7 +230,7 @@ def run(
num_workers: int = 4,
dataset_size: int = 100000,
steps: int = 100,
input_dim: int = 1024,
input_dim: str = '1024',
hidden_dim: int = 2048,
num_classes: int = 10,
depth: int = 3,
Expand All @@ -228,7 +247,7 @@ def run(
num_workers: dataloader worker processes
dataset_size: total number of synthetic samples
steps: number of batches per benchmark phase
input_dim: input feature dimension of the MLP
input_dim: input feature dimension – single int or C,H,W tuple (e.g. '1024' or '3,224,224')
hidden_dim: hidden layer width of the MLP
num_classes: number of output classes
depth: number of hidden layers
Expand All @@ -237,6 +256,7 @@ def run(
matrix_size: square matrix edge length for compute benchmarks
iterations: number of matrix-multiply iterations for compute benchmarks
"""
input_dim = _parse_dim(input_dim)
if mode == 'cpu':
print('CPU GFLOPS:', benchmark_cpu(matrix_size, iterations))
return
Expand Down Expand Up @@ -296,7 +316,7 @@ def main():
parser.add_argument('--num_workers', type=int, default=4)
parser.add_argument('--dataset_size', type=int, default=100000)
parser.add_argument('--steps', type=int, default=100)
parser.add_argument('--input_dim', type=int, default=1024)
parser.add_argument('--input_dim', type=str, default='1024')
parser.add_argument('--hidden_dim', type=int, default=2048)
parser.add_argument('--num_classes', type=int, default=10)
parser.add_argument('--depth', type=int, default=3)
Expand Down
Loading