Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions client_tools/client_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ def generate_models():
GetStateDictResponse,
GetTrainConfigsResponse,
SaveResponse,
TrainingProgressResponse,
)


Expand Down Expand Up @@ -617,6 +618,24 @@ def load(self, name: str, **kwargs) -> None:
)
response.raise_for_status()

def load_training_state(self, name: str, **kwargs) -> Dict[str, Any]:
"""Load optimizer, scheduler, scaler, RNG, and progress metadata from a checkpoint."""
response = http_post(
url=f'{self.server_url}/load_training_state',
json_data={'name': name, 'adapter_name': self.adapter_name, **kwargs}
)
response.raise_for_status()
return TrainingProgressResponse(**response.json()).result

def read_training_progress(self, name: str, **kwargs) -> Dict[str, Any]:
"""Read progress-only checkpoint metadata for resume-only-model flows."""
response = http_post(
url=f'{self.server_url}/read_training_progress',
json_data={'name': name, 'adapter_name': self.adapter_name, **kwargs}
)
response.raise_for_status()
return TrainingProgressResponse(**response.json()).result

def apply_patch(self, patch_cls: str, **kwargs) -> None:
"""Apply a patch to the model."""
response = http_post(
Expand Down
24 changes: 18 additions & 6 deletions cookbook/client/twinkle/self_host/self_congnition.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,29 @@ def train():
# model.set_lr_scheduler('LinearLR')

# Step 6: Optionally resume from a previous checkpoint
consumed_train_samples = 0
global_step = 0
if resume_path:
logger.info(f'Resuming training from {resume_path}')
model.load(resume_path, load_optimizer=True)
logger.info(f'Resuming model weights from {resume_path}')
model.load(resume_path)
trainer_state = model.load_training_state(resume_path)
dataloader.skip_consumed_samples(trainer_state['consumed_train_samples'])
consumed_train_samples = int(trainer_state['consumed_train_samples'])
global_step = int(trainer_state['cur_step'])

# Step 7: Run the training loop
logger.info(model.get_train_configs().model_dump())

for epoch in range(3):
logger.info(f'Starting epoch {epoch}')
for step, batch in enumerate(dataloader):
for _, batch in enumerate(dataloader):
# Forward pass + backward pass (computes gradients)
model.forward_backward(inputs=batch)

# Step
model.clip_grad_and_step()
consumed_train_samples += len(batch)
global_step += 1
# Equal to the following steps:
# # Clip gradients to prevent exploding gradients (max norm = 1.0)
# model.clip_grad_norm(1.0)
Expand All @@ -125,13 +133,17 @@ def train():
# model.lr_step()

# Log the loss every 2 steps (aligned with gradient accumulation)
if step % 2 == 0:
if global_step % 2 == 0:
# Print metric
metric = model.calculate_metric(is_training=True)
logger.info(f'Current is step {step} of {len(dataloader)}, metric: {metric.result}')
logger.info(f'Current is step {global_step} of {len(dataloader)}, metric: {metric.result}')

# Step 8: Save the trained checkpoint
twinkle_path = model.save(name=f'twinkle-epoch-{epoch}', save_optimizer=True)
twinkle_path = model.save(
name=f'twinkle-epoch-{epoch}',
save_optimizer=True,
consumed_train_samples=consumed_train_samples,
)
logger.info(f'Saved checkpoint: {twinkle_path}')

# Step 9: Upload the checkpoint to ModelScope Hub
Expand Down
120 changes: 83 additions & 37 deletions cookbook/transformers/fsdp2.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from pathlib import Path

from peft import LoraConfig
from tqdm import tqdm

Expand All @@ -8,77 +10,121 @@
from twinkle.model import TransformersModel
from twinkle.preprocessor import SelfCognitionProcessor

# Construct a device_mesh, fsdp_size=2, dp=4
device_mesh = DeviceMesh.from_sizes(fsdp_size=2, dp_size=4)
# use torchrun mode
twinkle.initialize(mode='local', global_device_mesh=device_mesh)
from resume_utils import resume_from_checkpoint

logger = get_logger()

MODEL_ID = 'ms://Qwen/Qwen3.5-4B'
DATASET_ID = 'ms://swift/self-cognition'
TEMPLATE_NAME = 'Qwen3_5Template'
MODEL_NAME = 'twinkle大模型'
MODEL_AUTHOR = 'ModelScope社区'
FSDP_SIZE = 2
DP_SIZE = 4
BATCH_SIZE = 8
LEARNING_RATE = 1e-4
GRADIENT_ACCUMULATION_STEPS = 2
LOG_INTERVAL = 20
EVAL_INTERVAL = 40
EVAL_SAMPLES = 100
TRAIN_SAMPLES = 1000

OUTPUT_DIR = './output/fsdp2'
RESUME_FROM_CHECKPOINT = None
RESUME_ONLY_MODEL = False
IGNORE_DATA_SKIP = False
ADAPTER_NAME = 'default'

# Construct a device_mesh
device_mesh = DeviceMesh.from_sizes(fsdp_size=FSDP_SIZE, dp_size=DP_SIZE)
# use torchrun mode
twinkle.initialize(mode='local', global_device_mesh=device_mesh)


def eval(model):
# 100 Samples
dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition', data_slice=range(100)))
dataset.set_template('Qwen3_5Template', model_id='ms://Qwen/Qwen3.5-4B')
dataset.map(SelfCognitionProcessor('twinkle大模型', 'ModelScope社区'))
def build_dataset(num_samples: int) -> Dataset:
dataset = Dataset(dataset_meta=DatasetMeta(DATASET_ID, data_slice=range(num_samples)))
dataset.set_template(TEMPLATE_NAME, model_id=MODEL_ID)
dataset.map(SelfCognitionProcessor(MODEL_NAME, MODEL_AUTHOR))
dataset.encode()
dataloader = DataLoader(dataset=dataset, batch_size=8)
for step, batch in tqdm(enumerate(dataloader)):
return dataset


def save_checkpoint(model: TransformersModel, checkpoint_name: str, consumed_train_samples: int):
model.save(
checkpoint_name,
output_dir=OUTPUT_DIR,
adapter_name=ADAPTER_NAME,
save_optimizer=True,
consumed_train_samples=consumed_train_samples,
)


def evaluate(model):
dataloader = DataLoader(dataset=build_dataset(EVAL_SAMPLES), batch_size=BATCH_SIZE)
for batch in tqdm(dataloader):
model.forward_only(inputs=batch)
model.calculate_loss()
metrics = model.calculate_metric(is_training=False)
return metrics
return model.calculate_metric(is_training=False)


def train():
# 1000 samples
dataset = Dataset(dataset_meta=DatasetMeta('ms://swift/self-cognition', data_slice=range(1000)))
# Set template to prepare encoding
dataset.set_template('Qwen3_5Template', model_id='ms://Qwen/Qwen3.5-4B')
# Preprocess the dataset to standard format
dataset.map(SelfCognitionProcessor('twinkle大模型', 'ModelScope社区'))
# Encode dataset
dataset.encode()
dataset = build_dataset(TRAIN_SAMPLES)
# Global batch size = 8, for GPUs, so 1 sample per GPU
dataloader = DataLoader(dataset=dataset, batch_size=8)
dataloader = DataLoader(dataset=dataset, batch_size=BATCH_SIZE)
# Use a TransformersModel
model = TransformersModel(model_id='ms://Qwen/Qwen3.5-4B')
model = TransformersModel(model_id=MODEL_ID)
model.model._no_split_modules = {'Qwen3_5DecoderLayer'}

lora_config = LoraConfig(r=8, lora_alpha=32, target_modules='all-linear')

# Add a lora to model, with name `default`
# Comment this to use full-parameter training
model.add_adapter_to_model('default', lora_config, gradient_accumulation_steps=2)
model.add_adapter_to_model(ADAPTER_NAME, lora_config, gradient_accumulation_steps=GRADIENT_ACCUMULATION_STEPS)
# Add Optimizer for lora `default`
model.set_optimizer(optimizer_cls='AdamW', lr=1e-4)
model.set_optimizer(optimizer_cls='AdamW', lr=LEARNING_RATE)
# Add LRScheduler for lora `default`
model.set_lr_scheduler(
scheduler_cls='CosineWarmupScheduler', num_warmup_steps=5, num_training_steps=len(dataloader))

consumed_train_samples = 0
if RESUME_FROM_CHECKPOINT:
checkpoint_path = Path(RESUME_FROM_CHECKPOINT).expanduser().resolve()
consumed_train_samples = resume_from_checkpoint(
model=model,
dataloader=dataloader,
checkpoint_path=checkpoint_path,
resume_only_model=RESUME_ONLY_MODEL,
ignore_data_skip=IGNORE_DATA_SKIP,
adapter_name=ADAPTER_NAME,
)

logger.info(get_device_placement())
# Print the training config
logger.info(model.get_train_configs())
logger.info(f'Total steps: {len(dataloader)}')
loss_metric = 99.0
optimizer_group = model.optimizer_group[ADAPTER_NAME]
best_loss = float('inf')
# lora: 8G * 8
# full: 18G * 8
for step, batch in enumerate(dataloader):
for batch in dataloader:
# Do forward and backward
model.forward_backward(inputs=batch)
# Step
model.clip_grad_and_step()
if step % 20 == 0:
consumed_train_samples += BATCH_SIZE
cur_step = optimizer_group.cur_step
if cur_step % LOG_INTERVAL == 0:
# Print metric
metric = model.calculate_metric(is_training=True)
logger.info(f'Current is step {step} of {len(dataloader)}, metric: {metric}')
if step > 0 and step % 40 == 0:
metrics = eval(model)
logger.info(f'Current is step {cur_step} of {len(dataloader)}, metric: {metric}')
if cur_step > 0 and cur_step % EVAL_INTERVAL == 0:
metrics = evaluate(model)
logger.info(f'Eval metric: {metrics}')
metrics['step'] = step
if loss_metric > float(metrics['loss']):
model.save(f'checkpoint-{step}')
loss_metric = float(metrics['loss'])
model.save(f'last-checkpoint')
metrics['step'] = cur_step
current_loss = float(metrics['loss'])
if current_loss < best_loss:
save_checkpoint(model, f'checkpoint-{cur_step}', consumed_train_samples)
best_loss = current_loss
save_checkpoint(model, 'last-checkpoint', consumed_train_samples)


if __name__ == '__main__':
Expand Down
55 changes: 55 additions & 0 deletions cookbook/transformers/resume_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from pathlib import Path
from typing import Any, Optional

from twinkle import get_logger


logger = get_logger()


def _build_model_kwargs(adapter_name: str) -> dict:
if not adapter_name:
return {}
return {'adapter_name': adapter_name}


def resume_from_checkpoint(
model: Any,
dataloader: Any,
checkpoint_path: Path,
*,
resume_only_model: bool,
ignore_data_skip: bool,
adapter_name: Optional[str] = None) -> int:
adapter_name = adapter_name or ''
checkpoint_dir = str(checkpoint_path)
model_kwargs = _build_model_kwargs(adapter_name)
if model_kwargs:
# Load adapter checkpoint.
model.load(
name=checkpoint_path.name,
output_dir=str(checkpoint_path.parent),
**model_kwargs,
)

if resume_only_model:
# Only load model weights, optionally skip data.
if ignore_data_skip:
logger.info('Resumed weights only and restarted progress from step 0.')
return 0
progress = model.read_training_progress(checkpoint_dir, **model_kwargs)
# Skip consumed samples in dataloader and move optimizer to the right step.
consumed_train_samples = int(progress['consumed_train_samples'])
dataloader.skip_consumed_samples(consumed_train_samples)
optimizer_group = model.optimizer_group[adapter_name]
optimizer_group.cur_step = progress['cur_step']
optimizer_group.gradient_accumulation_steps = progress['gradient_accumulation_steps']
logger.info(f'Skipped {consumed_train_samples} consumed samples.')
return consumed_train_samples

# Load full training state, including model weights, optimizer states, and training progress.
trainer_state = model.load_training_state(checkpoint_dir, **model_kwargs)
consumed_train_samples = int(trainer_state['consumed_train_samples'])
dataloader.skip_consumed_samples(consumed_train_samples)
logger.info(f'Restored full training state from step {trainer_state["cur_step"]}.')
return consumed_train_samples
13 changes: 13 additions & 0 deletions docs/source_en/Components/Model/TransformersModel.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,16 @@ for data in dataloader:
model.forward_backward(...)
model.clip_grad_and_step(..., gradient_accumulation_steps=16)
```

## Checkpoint and Resume

`TransformersModel.save()` can save either weights only or a resumable training checkpoint.

- `model.save(name, save_optimizer=True, consumed_train_samples=...)` saves weights together with optimizer, scheduler, scaler, RNG, and `trainer_state.json`.
- `model.load(name, output_dir=..., adapter_name=...)` restores LoRA / adapter model weights.
- `model.read_training_progress(checkpoint_dir, ...)` reads checkpoint metadata such as `cur_step`, `gradient_accumulation_steps`, and `consumed_train_samples`.
- `model.load_training_state(checkpoint_dir, ...)` restores optimizer-related state and returns the training progress dictionary.

For full-parameter training, restore model weights by constructing `TransformersModel` with the checkpoint path as `model_id`, for example `TransformersModel(model_id='./output/fsdp2/last-checkpoint')`, and then call `load_training_state(...)` to restore optimizer state and training progress.

For end-to-end resume logic, including dataloader skipping, refer to `cookbook/transformers/fsdp2.py` and `cookbook/transformers/resume_utils.py`.
Loading
Loading