# Grading Guidelines

- Full points for correct implementations, even with errors in earlier code/inputs.
- Each repeated mistake (e.g., wrong axis summation) loses points every time.
- Graders may deviate from scheme when:
  - Student demonstrates understanding despite mistakes (positive)
  - Limited understanding isn't fully reflected in scoring (negative)
- Grading platform:
  - Students will see grading scheme during rebuttal. Keep reviews minimal to allow scheme changes without reworking comments. Use negative points for mostly correct solutions and positive points for mostly incorrect ones.
  - Format task reviews as (triple ticks to keep newlines despite Markdown rendering):

    \`\`\`
    ```
    I) OK
    IIa) -1 because of X
    [...]

    <general comments>
    ```
    \`\`\`
  - Manually verify perfect autograder scores
  - Include comments in **all** review cells (including empty or perfect solutions)
- Modulating exam difficulty is only done via grading scale adjustment. Keep rewarding/penalizing difficult exam parts. 

## Typos/unneeded code:
- -0P for:
  - Readable code despite PEP8 violations
  - Clear typos (e.g., code generally readable and variable named correctly everywhere except once)
- -0.5P for:
  - Unused variables
  - Lingering `raise NotImplementedError()`
  - Unhelpful commented code (max once per task)
  - Unreadable code
- -1P for:
  - Excessive commented code hindering readability
  - Significantly unreadable code
  - Significant issues in running the code

<hr>

# Exercise 1: Coding Exercise

| Module | Allowed |
| - | - |
| `numpy` | **No** |
| `matplotlib` | **No** |
| `torch`      | **No** |

In this exercise, you should write code given an explanation and exemplary output.
It is insufficient to hard-code your solution to **only** work for the provided examples.


## Task 1.1 (25 points)

### Sensor Data Analysis

#### Expected Behavior

Implement a generator function `sensor_data`:
- The function does not take any input arguments.
- The function returns a generator that yields integers from `0` to `9`, representing sensor readings.

Implement a transformation function `double_even`:
- The function takes an integer `x` representing a sensor reading.
- The function returns `2 * x` if `x` is even.
- The function returns `None` if `x` is odd.

Implement a processing function `process_sensor_stream`:
- The function takes as input:
  - `data_gen`: A generator yielding sensor readings (e.g., from `sensor_data`).
  - `transforms`: An optional list of transformation functions (e.g., `double_even`).
    Each function takes a reading as input and returns a modified reading or `None`.
- The function returns:
  - A generator yielding transformed readings that successfully pass all transformations (transformations should be applied in the order provided).
  - A function `get_processed_count()` that returns the number of successfully processed readings.
- The `process_sensor_stream` function should:
  - Apply the transformations to each reading in sequence.
  - Skip readings that return `None` from any transformation.
  - Keep track of the number of successfully processed readings.

#### Remarks
- **Implement `get_processed_count` as a closure**. A closure is an inner function that retains access to
  variables from its outer function, even after the outer function has finished execution.

  Example of a closure:

  ```python
  def outer():
      x = 42
      def inner():
          print(x)
      return inner

  closure = outer()
  closure()
  ```
- Use the `nonlocal` keyword.



In [None]:

def sensor_data():
    ### BEGIN SOLUTION
    for i in range(10):
        yield i
    ### END SOLUTION


def double_even(x):
    ### BEGIN SOLUTION
    return x * 2 if x % 2 == 0 else None
    ### END SOLUTION


def process_sensor_stream(data_gen, transforms=None):
    ### BEGIN SOLUTION
    transforms = transforms or []
    processed_count = 0

    def pipeline():
        nonlocal processed_count
        for reading in data_gen:
            for transform in transforms:
                reading = transform(reading)
                if reading is None:
                    break
            else:
                processed_count += 1
                yield reading

    def get_processed_count():
        return processed_count

    return pipeline(), get_processed_count
    ### END SOLUTION



In [None]:



# This cell is for grading. Do not change or remove it. If this cell contains code, you can optionally execute it.


In [None]:


"""
Info provided to students on exam day: "Task 1: We recommend `sensor_data` to yield a finite sequence (otherwise, `process_sensor_stream` would never finish). However, an infinite sequence would also give full points."

Total: 25 points

I) `sensor_data` generator: 5P
   a) Uses `yield` to generate numbers (1P)
   b) Correctly generates numbers from 0 to 9 (4P)

II) Transformation function (`double_even`): 5P
   a) Correctly doubles even numbers (2.5P)
   b) Correctly returns `None` for odd numbers (2.5P)

III) Processing function (`process_sensor_stream`): 12P
   a) Applies transformations correctly in sequence (3P)
      - Iterates through transformations correctly (1P)
      - Correctly applies each transformation (1P)
      - Maintains data flow properly (1P)
   b) Properly skips `None` values and stops processing invalid readings (3P)
      - Identifies and handles `None` values correctly (1.5P)
      - Ensures invalid readings are not processed (1.5P)
   c) Tracks the number of successfully processed readings (4P)
      - Correctly increments count when processing valid readings (2P)
      - Ensures count is correctly maintained throughout execution (2P)
   d) Returns both a generator and a callable function (2P)
      - Ensures the generator yields correct values (1P)
      - Implements `get_processed_count` function properly (1P)

IV) Use of closures for `get_processed_count`: 3P
   a) Implements `get_processed_count` as an inner function (1.5P)
   b) Uses `nonlocal` to correctly track processed readings (1.5P)
"""


<hr>

# Exercise 2: Acceleration

| Module       | Allowed |
|--------------| - |
| `numpy`      | **Yes** |
| `matplotlib` | **No** |
| `torch`      | **No** |

**Implement** the accelerated counterparts of the functions `slow`, which produce the same output but are as compute- and memory-efficient as possible.
Please remember the disallowed routines specified in the header, e.g., do not use `np.vectorize`.
It is sufficient to only consider input shapes that let the respective `slow` function execute without error.


## Task 2.1 (25 points)


In [None]:

import numpy as np
import math

def slow_modified_cross_entropy_loss(W: np.ndarray, b: float, X: np.ndarray, y_true_one: np.ndarray, y_true_two: np.ndarray) -> float:
    loss = 0.0
    n, m = X.shape

    for i in range(n):
        z = b
        for j in range(m):
            z += W[j] * X[i, j]
        y_pred = 1 / (1 + math.exp(-z))
            
        if z >= 0:
            y_label = y_true_one[i]
            loss += y_label * math.log(y_pred)
        else:
            y_label = y_true_two[i]
            loss += y_label * math.log(y_pred) + (1 - y_label) * math.log(1 - y_pred)

    return -loss / n




In [None]:

import numpy as np

@no_loops_allowed
def fast_modified_cross_entropy_loss(W: np.ndarray, b: float, X: np.ndarray, y_true_one: np.ndarray, y_true_two: np.ndarray) -> float:
    ### BEGIN SOLUTION
    linear_combination = np.dot(X, W) + b
    y_pred = 1 / (1 + np.exp(-linear_combination))
    
    y_label = np.where(linear_combination >= 0, y_true_one, y_true_two)
    
    positive_loss = y_label * np.log(y_pred)
    negative_loss = (1 - y_label) * np.log(1 - y_pred)
    
    total_loss = positive_loss + np.where(linear_combination >= 0, 0, negative_loss)
    mean_loss = -np.mean(total_loss)
    
    return mean_loss
    ### END SOLUTION



In [None]:


# This cell is for grading. Do not change or remove it. If this cell contains code, you can optionally execute it.


In [None]:


"""
Total: 25 points

I) Computing linear combination and sigmoid (8.5P)
    a) 3.5P for correctly computing linear combination
    b) 5P for applying the sigmoid function correctly
II) Computing conditional label selection (6.5P)
    a) 3P for using np.where correctly
        Note: task can be solved without np.where via masks.
    b) 3.5P for selecting appropriate labels
III) Computing loss (10P)
    a) 3.5P for correctly computing positive loss
    b) 3.5P for correctly computing negative loss
    c) 3P for averaging the total loss correctly
"""


<hr>

# Exercise 3: Machine Learning Implementation

| Module | Allowed |
| - | - |
| `torch` | **Yes** |
| `numpy` | **No** |
| `matplotlib` | **Yes** |

This exercise considers diffusion models.
We will train a **timestep predictor** that estimates the most likely timestep of a noisy sample $x_t$.
This can enhance the denoising (reverse) diffusion path, e.g., the model can modulate the timestep progression to correct its predictions (moving "backwards in time" in the extreme case).

Note: The task involves two timestep formats:
- **Integer Time:** $[0, \dots, T-1]$ (unnormalized), where $T$ is the number of timesteps.
- **Float Time:** $[0, 1]$ (normalized)

The `helper.py` module provides various helper functions and classes.
For example, it defines the data as a 3×3 pixel grid with one white pixel and eight black pixels.


## Task 3.1 (25 points)
Train the `TimestepModel` for `30` epochs:

1. **Training loop**:
   - Shuffle the dataset at the beginning of each epoch.
   - Iterate over mini-batches.
   - For each batch:
     - Loop over single integer timesteps `t`. Each valid (given the current diffusion process) timestep should be considered exactly once per batch. Iterate over these timesteps in random order.
     - For every `t`:
        - Use the diffusion process to generate a noisy version of the batch.
        - Predict `t` using the `TimestepModel` using the noisy batch as input.
        - Compute the loss using Mean Squared Error (MSE).
        - Update model parameters using the Adam optimizer.

2. **Model selection**:
   - Track the best-performing model based on the lowest training epoch-loss.
   - Return the best model.

Remarks:
- Use the `ToyDiffusion` class (from `helper.py`) to generate noisy samples.
- You may use `torch.randperm`, `torch.nn.functional`, and `torch.optim`.
- Remember to write compute/memory efficient code (e.g., do not use excessive RAM).


In [None]:

import copy

import torch
import torch.nn.functional as F
from helper import TimestepModel, ToyDiffusion, generate_dataset


def train_timestep_model(
    timestep_model: TimestepModel,
    dataset: torch.Tensor,
    toy_diffusion: ToyDiffusion,
    batch_size: int = 10,
    epochs: int = 100,
) -> TimestepModel:
    """Train the timestep predictor model.

    Args:
        timestep_model (TimestepModel): The neural network model predicting timesteps.
        dataset (torch.Tensor): The dataset used for training.
        diffusion_process (ToyDiffusion): The diffusion process used to generate noisy samples.
        batch_size (int, optional): The size of training mini-batches. Defaults to 10.
        epochs (int, optional): The number of training epochs. Defaults to 100.

    Returns:
        TimestepModel: The best-performing model based on training loss.
    """
    optimizer = torch.optim.Adam(timestep_model.parameters(), lr=0.01)

    ### BEGIN SOLUTION
    # loop over epochs
    best_loss = float("inf")
    best_timestep_model = copy.deepcopy(timestep_model)  # for return type (it's fine if students skip this)
    for epoch in range(epochs):
        losses = []
        # Shuffle dataset
        perm = torch.randperm(len(dataset))

        for i in range(0, len(dataset), batch_size):
            # sample a batch
            batch_indices = perm[i : i + batch_size]
            target_batch = dataset[batch_indices]

            # loop over time steps
            for t in torch.randperm(toy_diffusion.timesteps):
                noisy_batch, noise_batch = toy_diffusion.get_noisy_sample(target_batch, t)
                pred_time_batch = timestep_model(noisy_batch)  # (batch_size,)
                loss = F.mse_loss(
                    pred_time_batch,  # (batch_size, )
                    # students need to convert to correct type (torch, not python float))
                    torch.tensor([toy_diffusion.normalize_time(t)] * len(target_batch), dtype=torch.float),  # (batch_size,)
                )
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                losses.append(loss.item())

        # compute average loss of epoch
        epoch_loss = torch.mean(torch.tensor(losses))

        # update best model
        if epoch_loss < best_loss:
            best_loss = epoch_loss.item()
            best_timestep_model = copy.deepcopy(timestep_model)

        print(f"epoch: {epoch}, loss: {torch.mean(torch.tensor(losses))}")

    return best_timestep_model
    ### END SOLUTION




In [None]:


torch.manual_seed(42)

model = train_timestep_model(
    timestep_model=TimestepModel(),
    dataset=generate_dataset(100),
    toy_diffusion=ToyDiffusion(),
    batch_size=10,
    epochs=30,
)


# This cell is for grading. Do not change or remove it. If this cell contains code, you can optionally execute it.


In [None]:

"""
Info provided to students on exam day (March 6th 2025): "Task 3: using numpy is OK, if you make the surrounding code work (e.g., for shuffling), albeit a torch solution would be simpler"

Notes:
- Using correct torch code without syntactic sugar (e.g.,`model.forward(x)` instead of `model (x)`) gives full points.
- The preliminary grading scheme was accidentally included in the exam version distributed to students. We use a slightly optimized grading scheme for grading.

Total: 25 points

I) Data handling and batching: 5P
    Note: Using a (torch or custom) dataloader gives full points.
   a) Correctly iterate over dataset (1P)
   b) Shuffle dataset at start of each epoch (2P)
   c) Correctly batch the dataset (2P)

II) Time step sampling and noise generation: 6P
   a) Iterate over timesteps (1P)
   b) Use `ToyDiffusion` to get timesteps (1P)
   c) Shuffle timesteps (2P)
   d) Use a function to generate a noisy sample (1P)
   e) Use `ToyDiffusion` to generate noisy samples (1P)

III) Loss computation: 5P
    Note: manual computation of MSE OK
   a) Compute predicted timestep (1P)
   b) Compute MSE loss between predicted and true timesteps (2P)
   c) Ensure proper timestep tensor shape (1P)
   d) Use correct normalization for timesteps (1P)

IV) Model training and optimization: 4P
   a) Properly call `optimizer.zero_grad()` before backpropagation (1P)
   b) Perform loss backward pass correctly (`loss.backward()`) (1P)
   c) Update model parameters using `optimizer.step()` (1P)
   d) Correctly iterate over epochs (1P)

V) Model selection and best model tracking: 5P
   a) Track best model based on lowest epoch loss (2P)
   b) Track best loss (1P)
   c) Ensure best model updates correctly (1P)
    Note: creative solutions (using torch.save/load, pickle save/load, etc.) gives full points.
   d) Return the best-trained model at end of training (1P)
"""


## Task 3.2 (25 points)
### Diffusion Process
Note: the formulas below assume integer time $t \in [0, T-1]$.

The diffusion process follows the equation
$$
    x_t = \sqrt{\bar{\alpha}_t} x_0 + \sqrt{1 - \bar{\alpha}_t} \epsilon,
$$
where:
- $x_t$ is the noisy version of the data at time $t$,
- $x_0$ is the original clean sample,
- $\bar{\alpha}_t$ is the accumulated product of the noise schedule,
- $\epsilon \sim \mathcal{N}(0, I)$ is Gaussian noise.

We can compute $x_{t-1}$ (the previous step) given $x_t$ as:

$$
    x_{t-1} = \frac{1}{\sqrt{\alpha_t}} \left( x_t - \frac{\beta_t}{\sqrt{1 - \bar{\alpha}_t}} \cdot \epsilon_{\theta}(x_t, t) \right)
    + \sqrt{\beta_t \cdot \frac{1 - \bar{\alpha}_{t-1}}{1 - \bar{\alpha}_t}} \cdot \epsilon
$$

Key terms in the noise schedule:
- **$\beta_t$:** Controls the amount of noise added at each step.
- **$\alpha_t$:** Defined as $\alpha_t = 1 - \beta_t$, ensuring that some original information is retained.
- **$\bar{\alpha}_t$:** The accumulated product of $\alpha_t$ over time, determining how much of the original data remains.
- **$\epsilon_{\theta}(x_t, t)$:** Predicted noise.
- $\epsilon \sim \mathcal{N}(0, I)$: Gaussian noise.

### Task
Implement a subclass of `ToyDiffusion` that performs iterative denoising based on predicted timesteps:

1. **Subclass `ToyDiffusion`**:
   - Create a class `ToyDenoiser` that inherits from `ToyDiffusion`.
   - Add an additional input parameter `time_model` for the timestep predictor.

2. **Implement the `denoise` method**:
   - Iteratively predict the timestep for noisy samples.
   - Predict the respective noise.
   - Apply the reverse diffusion formula to reconstruct the sample. Only add random noise (from a Standard Gaussian, not the predicted noise) when the predicted timestep is greater than 1.
   - Continue until the predicted integer (=unnormalized) timestep is 0 or 1 (convergence).
   - Return the denoised sample, the number of steps taken, and the predicted (unnormalized) timesteps.
   - Include a `max_steps` (set to `100`) parameter to limit the number of iterations.

The cell below uses pre-trained noise and timestep predictors (already available in this directory). **Your solution must work with these models.** You may optionally **additionally** test with your solution from task 1, but please comment the respective code before submission.

Remark:
- It is OK if your solution only works for a single sample (no batch dimension) as in the below example code.


In [None]:

import torch
from helper import NoiseModel, TimestepModel, ToyDiffusion, generate_dataset


### BEGIN SOLUTION
class ToyDenoiser(ToyDiffusion):
    """Enhanced diffusion model that uses a timestep predictor to guide the denoising process."""

    def __init__(
        self,
        noise_model: NoiseModel,
        time_model: TimestepModel,
        timesteps: int = 10,
    ):
        super().__init__(noise_model, timesteps)
        self.time_model = time_model

    def denoise(self, x: torch.Tensor, max_steps: int = 100) -> tuple[torch.Tensor, int, list[int]]:
        """Adaptively denoises images by iteratively predicting and removing noise based on estimated timesteps."""
        step = 0
        times = []
        for step in range(max_steps):
            pred_float_time = self.time_model(x)
            pred_noise = self.noise_model(x, pred_float_time)

            pred_int_time = self.unnormalize_time(pred_float_time)
            times.append(pred_int_time)

            beta = self.betas[pred_int_time]
            alpha = self.alphas[pred_int_time]
            alpha_bar = self.alphas_bar[pred_int_time]

            x = (x - beta / torch.sqrt(1 - alpha_bar) * pred_noise) / torch.sqrt(alpha)
            if pred_int_time > 1:
                x += torch.sqrt(beta * (1 - self.alphas_bar[pred_int_time - 1]) / (1 - alpha_bar)) * torch.randn_like(
                    x
                )
            else:
                break

        return x.detach().squeeze(), step + 1, times


### END SOLUTION




In [None]:

from matplotlib import pyplot as plt

torch.manual_seed(42)

# models already available (do not overwrite the respective files)
noise_model = torch.load("noise_predictor_solution.pt", map_location="cpu")
time_model = torch.load("timestep_predictor_solution.pt", map_location="cpu")

denoiser = ToyDenoiser(noise_model=noise_model, time_model=time_model)
dataset = generate_dataset(1)
original_sample = dataset[0]


def noise_and_denoise(t: int):
    print(f"Sanity check for t={t}")
    torch.manual_seed(t + 42)  # Set seed inside function to ensure same noise pattern for each test
    noisy_sample, _ = denoiser.get_noisy_sample(original_sample, t)
    
    torch.manual_seed(t + 100)  # Reset seed for consistent denoising
    denoised_sample, n_steps, times = denoiser.denoise(noisy_sample.clone(), max_steps=100)
    print(f"Converged in {n_steps} steps")

    fig, ax = plt.subplots(1, 3, figsize=(9, 3))
    ax[0].imshow(original_sample, cmap="gray")
    ax[0].set_title("Original Sample")
    ax[1].imshow(noisy_sample.squeeze(), cmap="gray")
    ax[1].set_title("Noisy Sample")
    ax[2].imshow(denoised_sample.squeeze(), cmap="gray")
    ax[2].set_title("Denoised Sample")
    plt.tight_layout()
    plt.show()

    plt.figure(figsize=(10, 4))
    plt.plot(times)
    plt.title(f"Timestep Evolution (starting from t={t})")
    plt.xlabel("Denoising Step")
    plt.ylabel("Predicted Timestep")
    plt.grid(True)
    plt.show()


noise_and_denoise(4)  # mid-range noise
noise_and_denoise(9)  # high noise (original sample still slightly visible, i.e., reconstruction of original sample may be possible, which would not reliably be the case from random noise)


# This cell is for grading. Do not change or remove it. If this cell contains code, you can optionally execute it.


In [None]:

"""
Info provided to students on exam day (March 6th 2025): "Task 3: using numpy is OK, if you make the surrounding code work (e.g., for shuffling), albeit a torch solution would be simpler"

Note: Using correct torch code without syntactic sugar (e.g.,`model.forward(x)` instead of `model (x)`) gives full points.

Total: 25P

I) Implementing `ToyDenoiser`: 25P
   a) 5P: Correctly subclass `ToyDiffusion` and initialize with `time_model`.
      1) 1P for inputs to __init__ (`self`, `noise_model`, `time_model`). 
         - Did not take off for not including `timesteps` since we only use the default value in the testing code
      2) 1P initialize `time_model` attribute
      3) 1P for `class ToyDenoiser(ToyDiffusion):`
      4) 1P for `super().__init__`
      5) 1P for `noise_model` and `timesteps` arguments to `super().__init__`
         - since testing code only uses keyword arguments, gave credit if students properly used `*args`, `**kwargs`
   b) 20P: `denoise` method
      1) 2P: Define denoise method
         - 0.5 for defining as method of `ToyDenoiser`
         - 0.5 for `self` input
         - 0.5 each for noisy sample and `max_steps` input
      2) 6P: Use pre-trained noise and timestep predictors
         - 1P each for predicting timesteps and noise
         - 1P each for accessing beta, alpha, alpha_bar values
         - 1P for unnormalizing time
      3) 7P: Correctly apply reverse diffusion updates using the predicted noise and timestep.
         - 1P for condition if predicted timestep > 1
         - 3P for first term
         - 3P for second term
      4) 2P: Implement stopping criteria (`pred_time == 0 or 1` and `max_steps`).
      5) 3P: Return denoised sample, number of steps taken, and predicted timesteps
"""
