AWS Glue DataBrew using AWS CDK

Introduction

This is the second post in a series on Data Engineering with AWS. In this post, I’d like to share my thoughts and key takeaways from experiments I’ve conducted with the AWS Glue DataBrew service - particularly from an Infrastructure as Code (IaC) perspective, as well as building a data pipeline to transform and merge files stored in S3 to make them ready for further analytical processing.

AWS Glue DataBrew is a visual data preparation tool that helps you clean and normalize data without writing dedicated ETL code. It offers over 250 built-in functions to manipulate data - including formatting text, handling dates, masking PII, and more. As a fully serverless service, you pay only for the processing you use. You can run jobs on demand or schedule them to run regularly using the built-in scheduler.

DataBrew comes with two types of jobs:

  1. Profile jobs - jobs that help you understand and validate the quality of your data. You can define a set of rules (e.g. values in humidity column should be between 0 and 100) and add them to a profile job to generate insights and statistics about your data.
  2. Recipe jobs - jobs that help you clean, fix, and transform your data based on either business requirements or profile job results (e.g., masking PII data, replacing null values with empty strings, etc.).

In this post, I’d like to focus on the latter - where I already trust the quality of my data and just need to make slight transformations.

Business requirements

There is a fleet of weather station devices that send measurement results in batches every few hours. Each batch contains several small CSV files. These files are stored in S3 and are partitioned by sensor identifier and ingestion date, for example:

  • data/sensor_001/2025-03-21/result1.csv
  • data/sensor_001/2025-03-21/result2.csv
  • data/sensor_002/2025-03-21/result1.csv and so on.

Each result file follows this structure:

timestamp,temperature,humidity,pressure,battery_level,signal_strength
2025-03-08T08:00:00,21.5,45.2,999.00,98.5,-65
2025-03-08T08:00:15,21.6,45.3,999.15,98.5,-66
2025-03-08T08:00:30,21.7,45.5,999.30,98.4,-65
2025-03-08T08:00:45,21.8,45.8,999.45,98.4,-67

We need a job that runs every 6 hours. This job should merge all small files into one or more Parquet files as needed. It must be resilient and able to select all files ingested since the last successful execution.

The output should be partitioned by measurement date (not ingestion date) only. Additionally, the result files should include the following extra columns:

  • week_number — calculated from the measurement date
  • pressure_label — with values:
    • LOW - if pressure < 1000
    • NORMAL - if pressure is between 1000 and 1020
    • HIGH - if pressure > 1020
  • sensor_id — for identifying the sensor
  • humidity — rounded to the nearest integer

Requirements validation

To build a functional data pipeline with AWS Glue DataBrew, we need to configure the following key building blocks:

  1. Dataset - a read-only data source for the job
  2. Recipe - a sequence of transformation steps that define how the data should be processed
  3. Job - a task that ingests data from the specified dataset, processes it according to the defined recipe, and stores the output in the requested target location

Let's start with reviewing all the requirements to identify potential challenges and see how they can be addressed using the above building blocks, either independently or in combination with other AWS services.

1. Scheduled job running every 6 hours

AWS Glue DataBrew recipe job supports automatic jobs run with a scheduler. There are 3 options available out of teh box:

  1. Recurring - defines how frequently we want the job to be executed (e.g. every 4 hours). We can also define, which day or days teh job should be run on
  2. At a particular time - defines the exact time the job should be fired
  3. CRON based expression - the most flexible options, where the schedule can be defined with CRON based expression

Given these options, there should not be any problem with scheduling the job to run according to our requirements. However, in worst case scenario, EventBridge in combination with Step Functions can be leveraged to orchestrate and manage job execution with greater flexibility.

2. Merging small CSV files into single Parquet file

The job is responsible for taking files from the given dataset and generating the result. There are a few options available when generating the result if the target is S3.

image

First of all, we need to specify whether we want the results to be generated separately for each job execution or if each consecutive job run should overwrite the previous results. The first option will result in an additional directory, with the directory name containing the job's name and the execution timestamp. Since the job will be executed on a regular basis, we should avoid overwriting existing results. Therefore, the natural choice in our case is to create a separate directory for each execution.

The other options we need to consider is the file partitioning. There are 3 options available:

  1. Automatic - let the job to decide how many files should be generated
  2. Single file output - for a job to generate single file
  3. Multiple files - we can specify any number between 2 and 999

Taking into account our requirements (we want to combine small files into a single one), we should consider either option 1 or option 2 for file partitioning. Additionally, since the results should be partitioned by measurement date (we will cover that below), the outputs will follow the format:

s3://output/myjob_timestamp1/measurement_date=2025-03-08/myjob_timestamp1_part001.parquet
s3://output/myjob_timestamp2/measurement_date=2025-03-08/myjob_timestamp2_part001.parquet
s3://output/myjob_timestamp3/measurement_date=2025-03-08/myjob_timestamp3_part001.parquet
s3://output/myjob_timestamp4/measurement_date=2025-03-09/myjob_timestamp4_part001.parquet

Although this structure keeps the data somewhat organized, the object keys are not well-formatted. Querying this data using, for example, Athena will not be efficient if we want to filter measurements for a particular date.

The desired result and what we should aim at is:

s3://output/measurement_date=2025-03-08/myjob_timestamp1_part001.parquet
s3://output/measurement_date=2025-03-08/myjob_timestamp2_part001.parquet
s3://output/measurement_date=2025-03-08/myjob_timestamp3_part001.parquet
s3://output/measurement_date=2025-03-09/myjob_timestamp4_part001.parquet

So, it seems that we will still need some post-processing to keep our results organized. This can be accomplished by creating a Lambda function that will move the result to meet our requirements. Essentially, it will remove the job name from the result key.

3. Selecting files since the last successful execution

To connect multiple files to the dataset, we need to combine a few options. Since our raw files are partitioned by sensor ID and ingestion date, there is no fixed path to them. Glue DataBrew supports these requirements using dynamic paths. This means we can define the key using either a parameterized path, a regular expression or both.

For instance, a key like s3://databrew-bucket/data/{parameter1}/file-<.*>.json will match all paths where parameter1 is any String, Date or Integer, and the JSON files match the regular expression.

In our case, we could define the key using the following pattern: data/{sensor_id}/{ingestion_date}/<.*>.csv. This will select all files currently stored in our bucket. However, to achieve true incremental data processing, we also need a way to filter out already processed files.

The downside is that Glue DataBrew does not have a built-in checkpointing mechanism. It does support selecting files that match a key with additional filtering, such as selecting all files modified within the last N minutes or those with a modification date greater than a specified timestamp. However, it does not keep track of the last execution time, meaning it cannot automatically pick up only newly uploaded files since the last execution.

So, to implement a checkpointing mechanism, we would need to track the last successful execution ourselves using either DynamoDB or Parameter Store.

4. Partitioning the output by measurement date (not ingestion date)

Based on the screen above, when defining the output for the job result, there is an option to partition the result by specified dataset columns. Since every measurement has a timestamp, we need to convert it to a date using one of the built-in functions.

To be more precise, we will use the DATE_FORMAT function to convert the timestamp into a date in the yyyy-mm-dd format.

5. Enriching data with additional columns

In order to enriching data with additional columns, we will use another set of built-in functions:

  • WEEK_NUMBER - to extract week number out of the timestamp
  • ROUND - to round the humidity value
  • CASE_OPERATION - to add humidity label and some other to clean up the result before generating final file.

Solution

Based on the above analysis, we can use Glue DataBrew to create the pipeline. However, it needs to be supported by a few other services to ensure flexibility and efficiency. Essentially, we will orchestrate the pipeline with StepFunctions to implement checkpointing and ensure that the final results are properly partitioned.

Before we go into orchestration, we need to set up fundamental components for our data pipeline.

Dataset

At the moment of writing the post, there is only L1 construct to create a dataset. And below is the dataset definition meeting our requirements:

const weatherStationsDataset = new databrew.CfnDataset(this, 'Dataset', {
  name: 'WeatherStationsDataset',
  format: 'CSV', // define input file format
  input: {
    s3InputDefinition: {
      // use the backed named 
      bucket: rawDataBucket.bucketName,
      // and define dynamic key for our data
      key: 'data/{sensor_id}/{ingestion_date}/<.*>.csv', 
    },
  },
  pathOptions: {
    lastModifiedDateCondition: {
      // select only files from particular date range (this is to support checkpointing)
      // start_date and end_date are parameters that needs to be set
      expression: '(AFTER :start_date) AND (BEFORE :end_date)', 
      valuesMap: [
        {
          // set it to any values, the proper one will be set by a step function
          valueReference: ':start_date',
          value: '2025-01-30T01:00:00Z',
        },
        {
          valueReference: ':end_date',
          value: '2025-03-16T01:00:00Z',
        },
      ],
    },
    parameters: [
      // define types for the dynamic parameters
      {
        pathParameterName: 'sensor_id',
        datasetParameter: {
          // we will be merging the files, so we need to pass on the information 
          // about the sensor so want the column to be created for `sensor_id` parameter
          createColumn: true,
          name: 'sensor_id',
          type: 'String',
        },
      },
      {
        pathParameterName: 'ingestion_date',
        datasetParameter: {
          // we do not need the ingestion date for the data set
          createColumn: false,
          name: 'ingestion_date',
          type: 'String',
        },
      },
    ],
  },
})

Recipe

The recipe will require more attention and explanation. When working in Databrew with recipes, we need to distinguish between two types:

  1. Working/Draft – These are still under construction and not yet ready to be assigned to jobs.
  2. Published – These are created from drafts when a recipe is ready to be used and assigned to a job.

At the moment of writing this post, there is only L1 construct to create a working version of a recipe. Therefore, to make the recipe available for the job, we will have to create a custom construct to support that. To make our life easier, we will close L1 construct and publishing the working version into custom construct:

export interface DatabrewRecipeProps {
  /**
   * Recipe name
   */
  readonly name: string
  /**
   * Recipe description
   */
  readonly description?: string

  /**
   * Transformations steps
   */
  readonly steps: CfnRecipe.RecipeStepProperty[]

  /**
   * Tags to describe the pipeline
   */
  readonly tags?: Record<string, string>
}

export class DatabrewRecipe extends Construct {
  public readonly recipe: CfnRecipe
  public readonly publisher: cdk.CustomResource

  constructor(scope: Construct, id: string, props: DatabrewRecipeProps) {
    super(scope, id)

    // L1 construct to create working version or recipe
    this.recipe = new CfnRecipe(this, 'Recipe', {
      ...props,
      tags: Object.entries(props.tags || {}).map(([key, value]) => ({ key, value } as CfnTag)),
    })

    // lambda encapsulating the logic to provision published versions
    const handler = new NodejsFunction(this, `${id}Publisher`, {
      runtime: lambda.Runtime.NODEJS_22_X,
      bundling: {
        externalModules: ['@aws-sdk'],
      },
      entry: path.resolve(__dirname, `/../../lambdas/databrew-recipe/src/index.ts`),
    })
    // the lambda needs to publish/delete the version (so that we can support stack deletion)
    handler.addToRolePolicy(
      new cdk.aws_iam.PolicyStatement({
        actions: ['databrew:PublishRecipe', 'databrew:BatchDeleteRecipeVersion', 'databrew:ListRecipeVersions'],
        resources: [
          cdk.Arn.format(
            {
              service: 'databrew',
              resource: 'recipe',
              resourceName: props.name,
            },
            cdk.Stack.of(this)
          ),
        ],
      })
    )
    // standard support for custom resources in CDK
    const provider = new cr.Provider(this, 'Provider', {
      onEventHandler: handler,
    })
    this.publisher = new cdk.CustomResource(this, 'Resource', {
      serviceToken: provider.serviceToken,
      // we pass the name and the description to the lambda
      // so in order to publish new version we need to make sure 
      // either the version or description will be updated
      properties: {
        RecipeName: props.name,
        Description: props.description,
      },
    })
    // wait with publishing for the working version to be created
    this.publisher.node.addDependency(this.recipe)
  }
}

And the lambda code:

const databrew = new DataBrewClient({})

export async function handler(event: CdkCustomResourceEvent): Promise<CdkCustomResourceResponse> {
  // extract the parameter passed on from construct
  const recipeName = event.ResourceProperties.RecipeName
  const description = event.ResourceProperties.Description
  const physicalResourceId = `${recipeName}`

  try {
    switch (event.RequestType) {
      case 'Create':
      case 'Update':
        // in case of create and update,
        // call a publish command to make new version available
        await databrew.send(
          new PublishRecipeCommand({
            Name: recipeName,
            Description: description,
          })
        )
        // and return `CdkCustomResourceResponse`
        return {
          Status: 'SUCCESS',
          PhysicalResourceId: physicalResourceId,
          Data: {
            RecipeName: recipeName,
          },
          LogicalResourceId: event.LogicalResourceId,
          RequestId: event.RequestId,
          StackId: event.StackId,
        }

      case 'Delete': {
        // get all recipe versions
        const listResponse = await databrew.send(
          new ListRecipeVersionsCommand({
            Name: recipeName,
          })
        )

        const recipes = listResponse.Recipes || []
        if (recipes.length > 0) {
          const versionsToDelete = recipes
            // The LATEST_WORKING cannot be deleted if the recipe has other versions
            // so remove all others and the LATEST_WORKING will be deleted by the L1 construct itself
            .filter(recipe => recipe.RecipeVersion !== 'LATEST_WORKING')
            .filter(recipe => recipe.RecipeVersion !== undefined)
            .map(version => version.RecipeVersion?.toString())

          if (versionsToDelete.length > 0) {
            await databrew.send(
              // delete all versions in batch
              new BatchDeleteRecipeVersionCommand({
                Name: recipeName,
                RecipeVersions: versionsToDelete as string[],
              })
            )
          }
        }
        // return the standard response in case of success
        return {
          Status: 'SUCCESS',
          PhysicalResourceId: event.PhysicalResourceId,
          LogicalResourceId: event.LogicalResourceId,
          RequestId: event.RequestId,
          StackId: event.StackId,
        }
      }
    }
  } catch (error) {
    // return the standard response in case of failure
    return {
      Status: 'FAILED',
      Reason: error instanceof Error ? error.message : String(error),
      PhysicalResourceId: event.LogicalResourceId || physicalResourceId,
      LogicalResourceId: event.LogicalResourceId,
      RequestId: event.RequestId,
      StackId: event.StackId,
    }
  }
}

And here the recipe definition to create required transformation:

import { CfnRecipe } from 'aws-cdk-lib/aws-databrew'

export const cleanMergeWeatherStationsSteps: CfnRecipe.RecipeStepProperty[] = [
  {
    action: {
      // we need measurement date out of the timestamp so that we can properly partition by it
      operation: 'DATE_FORMAT',
      parameters: {
        sourceColumn: 'timestamp',
        dateTimeFormat: 'yyyy-mm-dd',
        targetColumn: 'measurement_date',
      },
    },
  },
  {
    action: {
      // By using the WEEK_NUMBER operation, we add an extra column that indicates the week in which the measurements were taken
      operation: 'WEEK_NUMBER',
      parameters: {
        sourceColumn: 'timestamp',
        targetColumn: 'week_number',
      },
    },
  },
  {
    action: {
      // using sort operation, we make sure that all merged results are properly sorted
      operation: 'SORT',
      parameters: {
        expressions: JSON.stringify([
          { sourceColumn: 'sensor_id', ordering: 'ASCENDING', nullsOrdering: 'NULLS_TOP' },
          { sourceColumn: 'timestamp', ordering: 'ASCENDING', nullsOrdering: 'NULLS_TOP' },
        ]),
      },
    },
  },
  {
    action: {
      // we round the humidity and the new value is put in the extra column
      operation: 'ROUND',
      parameters: {
        sourceColumn: 'humidity',
        targetColumn: 'humidity_round',
      },
    },
  },
  {
    action: {
      // with case operator we add label for the pressure
      operation: 'CASE_OPERATION',
      parameters: {
        valueExpression:
          "CASE WHEN pressure IS NULL THEN 'N/A' WHEN pressure > 1020 THEN 'HIGH' WHEN pressure < 1000 THEN 'LOW' ELSE 'NORMAL' END",
        targetColumn: 'pressure_label',
      },
    },
  },
  {
    action: {
      // and the time for the clean up, removing old humidity value
      operation: 'DELETE',
      parameters: {
        sourceColumn: 'humidity',
      },
    },
  },
  {
    action: {
      // and renaming the column with rounded values
      operation: 'RENAME',
      parameters: {
        sourceColumn: 'humidity_round',
        targetColumn: 'humidity',
      },
    },
  },
]

And putting all together into the stack:

const cleanMergeRecipe = new DatabrewRecipe(this, 'CleanMergeRecipe', {
  name: 'CleanMergeWeatherStationsData',
  description: 'Clean, transform and merge weather-stations results (v1).',
  tags: {
    Project: 'WeatherStations',
  },
  steps: cleanMergeWeatherStationsSteps,
})

The construct will create and publish the version so that we will be able to assign it to the job.

Job

And the last building block from the fundamentals. Again, we will use L1 construct to create required resource:

// create a job role
const jobRole = new iam.Role(this, 'JobRole', {
  assumedBy: new iam.ServicePrincipal('databrew.amazonaws.com'),
  managedPolicies: [iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSGlueDataBrewServiceRole')],
})
// job should read the raw data bucket
rawDataBucket.grantRead(jobRole)
// and write to a bucket with the results
cleanDataBucket.grantReadWrite(jobRole)

const cleanMergeOnDemandJob = new databrew.CfnJob(this, 'CleanMergeOnDemandJob', {
  name: 'CleanMergeWeatherStationsDataJob',
  type: 'RECIPE',
  datasetName: weatherStationsDataset.name,
  roleArn: jobRole.roleArn,
  timeout: 300,
  // for the demo purpose and to make the cost under control
  maxCapacity: 2,
  // define the recipe and use the latest published version
  recipe: {
    name: cleanMergeRecipe.recipe.name,
    version: 'LATEST_PUBLISHED',
  },
  // end define output based on the requirements above
  outputs: [
    {
      location: {
        bucket: cleanDataBucket.bucketName,
        // store the result in tmp directory
        // we will use post-execution task to move to proper directory with the correct partitioning
        key: 'tmp',
      },
      // partition results by measurement_date only (new column created by a RECIPE)
      partitionColumns: ['measurement_date'],
      // define maxOutputFiles to 1
      maxOutputFiles: 1,
    },
  ],
})
// get the job arn (will be used later on)
const cleanMergeOnDemandJobArn = cdk.Arn.format(
  {
    service: 'databrew',
    resource: 'job',
    resourceName: cleanMergeOnDemandJob.name,
    region: this.region,
    account: this.account,
  },
  this
)
// and make sure all resources are create before we can create the job
cleanMergeOnDemandJob.addDependency(cleanMergeRecipe.recipe)
cleanMergeOnDemandJob.addDependency(weatherStationsDataset)
cleanMergeOnDemandJob.node.addDependency(cleanMergeRecipe.publisher)

That was the latest building block from the fundamentals. Now, it's time to orchestrate all steps with step functions. This is what we will be building:

image

Each step will be explained in details below. The step function will be created as a separate construct. It requires a few parameters that we will have to set:

export interface S3DatasetPathParameter {
  readonly name: string
  readonly type?: string
  readonly createColumn?: boolean
}

export interface DatabrewPipelineProps {
  /**
   * Raw data bucket
   */
  readonly rawDataBucket: s3.IBucket

  /**
   * Target bucket
   */
  readonly outDataBucket: s3.IBucket

  /**
   * Key in target bucket for temporary files
   */
  readonly tmpKeyPattern: string

  /**
   * Final destination in target bucket
   */
  readonly outKeyPattern: string

  /**
   * Dynamic key of files in dataset
   */
  readonly rawKeyPattern: string

  /**
   * Path to the parameter to track successful job's execution
   */
  readonly lastExecutionParameterName: string

  /**
   * Dataset name
   */
  readonly dataset: string

  /**
   * Definition of dynamic keys
   */
  readonly datasetPathParameters: S3DatasetPathParameter[]

  /**
   * Job name
   */
  readonly job: string

  /**
   * Job role ARN
   */
  readonly jobRoleArn: string

  /**
   * Timeout for the job
   */
  readonly timeout?: number
}

GetLastSuccessfulExecutionTime

Since DataBrew does not have built-in checkpoint-ing mechanism, we will use Parameter Store to track last successful execution of the job. Te first task is to read the value. For that we will use CallAwsService type of task:

const getLastSuccessfulExecutionTime = new tasks.CallAwsService(this, 'GetLastSuccessfulExecutionTime', {
  service: 'ssm',
  action: 'getParameter',
  parameters: {
    Name: props.lastExecutionParameterName,
    WithDecryption: true,
  },
  iamResources: [`arn:aws:ssm:${stack.region}:${stack.account}:parameter${props.lastExecutionParameterName}`],
  resultPath: '$.lastExecutionTimestamp',
})

The value (SSM/getParameter response) will be available for the next tasks under lastExecutionTimestamp path.

PrepareTimestampsTask

In order to maintain consistency and control over the files processed by a job, we will define a strict range for the selected files. This range will be determined by the last successful job execution and the timestamp of the step function execution. We will use Pass type of task to handle that.

const prepareTimestampsTask = new stepfunctions.Pass(this, 'PrepareTimestampsTask', {
  parameters: {
    'last_execution_timestamp.$': '$.lastExecutionTimestamp.Parameter.Value',
    'current_timestamp.$': '$$.Execution.StartTime',
  },
  resultPath: '$.timestamps',
})

We extract both values and make them available under timestamps.last_execution_timestamp and timestamps.current_timestamp values.

UpdateDatasetTask

This is the most tricky task. When we defined a dataset in previous steps, we hardcoded some values. The goal of this task is to set start_date and end_date to the values defined in the previous step. To achieve that we need to use again CallAwsService task. We will use UpdateDataset API.

const datasetArn = cdk.Arn.format(
  {
    service: 'databrew',
    resource: 'dataset',
    resourceName: props.dataset, // our dataset name
    region: stack.region,
    account: stack.account,
  },
  stack
)
const updateDatasetTask = new tasks.CallAwsService(this, 'UpdateDatasetTask', {
  service: 'databrew',
  action: 'updateDataset',
  parameters: {
    // we are repeating the dataset configuration
    Input: {
      S3InputDefinition: {
        Bucket: props.rawDataBucket.bucketName,
        Key: props.rawKeyPattern,
      },
    },
    Name: props.dataset,
    PathOptions: {
      LastModifiedDateCondition: {
        Expression: '(AFTER :start_date) AND (BEFORE :end_date)',
        // use the parameter prepared in the previous step
        ValuesMap: {
          ':start_date': stepfunctions.JsonPath.stringAt('$.timestamps.last_execution_timestamp'),
          ':end_date': stepfunctions.JsonPath.stringAt('$.timestamps.current_timestamp'),
        },
      },
      Parameters: props.datasetPathParameters.reduce(
        (result, param) => ({
          ...result,
          [param.name]: {
            CreateColumn: param.createColumn ?? true,
            Name: param.name,
            Type: param.type ?? 'String',
          },
        }),
        {}
      ),
    },
  },
  iamResources: [datasetArn],
  resultPath: '$.updateResult',
})

The update results will be available under updateResult path. We do not validate it at the moment, but would be good to verify if the call was successful or not (and fail the step function in case of failure).

StartDatabrewJob

Since the dataset is updated, it is the time to start the job. We can use available task (GlueDataBrewStartJobRun):

const startDatabrewJob = new tasks.GlueDataBrewStartJobRun(this, 'StartDatabrewJob', {
  name: props.job,
  resultPath: '$.jobRun',
})

It requires only job name to be passed on. Once the job been scheduled, we need to wait for the result. To achieve that, we will use a simple wait/check job status loop that is composed of two tasks.

CheckJobStatus

The goal of that task is to read current status of the job. We will use CallAwsService task with DescribeJobRun API call.

const checkJobStatus = new tasks.CallAwsService(this, 'CheckJobStatus', {
  service: 'databrew',
  action: 'describeJobRun',
  parameters: {
    Name: props.job,
    RunId: stepfunctions.JsonPath.stringAt('$.jobRun.RunId'),
  },
  iamResources: [props.jobRoleArn],
  resultPath: '$.jobStatus',
})

The status is available under jobStatus path.

JobStatusChoice

Based on the current status we need to take an action (wait, fail or cleanup the results in case of successful job execution). We can utilize Choice task type for that work.

const jobStatusChoice = new stepfunctions.Choice(this, 'JobStatusChoice')
  .when(stepfunctions.Condition.stringEquals('$.jobStatus.State', 'STARTING'), waitForJobToComplete)
  .when(stepfunctions.Condition.stringEquals('$.jobStatus.State', 'WAITING'), waitForJobToComplete)
  .when(stepfunctions.Condition.stringEquals('$.jobStatus.State', 'RUNNING'), waitForJobToComplete)
  .when(stepfunctions.Condition.stringEquals('$.jobStatus.State', 'STOPPING'), waitForJobToComplete)
  .when(stepfunctions.Condition.stringEquals('$.jobStatus.State', 'SUCCEEDED'), postExecutionCleanupTask)
  .otherwise(
    new stepfunctions.Fail(this, 'JobFailed', {
      cause: 'DataBrew Job Failed',
    })
  )

Valid values for State are: STARTING | RUNNING | STOPPING | STOPPED | SUCCEEDED | FAILED | TIMEOUT. In case of not finished state, we move to wait task.

WaitForJobToComplete

The goal of this task is the pause execution of a step function for some period of time. After that we will move to CheckJobStatus task again. This task is backed by Wait task.

const waitForJobToComplete = new stepfunctions.Wait(this, 'WaitForJobToComplete', {
  time: stepfunctions.WaitTime.duration(cdk.Duration.seconds(30)),
})

PostExecutionCleanupTask

The goal of this task is to move the files generated by a job from temporary location ot the final one in order to achieve proper data partitioning.

The job will store the results in:

s3://tmp/myjob_timestamp1/measurement_date=2025-03-08/myjob_timestamp1_part001.parquet
s3://tmp/myjob_timestamp2/measurement_date=2025-03-08/myjob_timestamp2_part001.parquet
s3://tmp/myjob_timestamp3/measurement_date=2025-03-08/myjob_timestamp3_part001.parquet
s3://tmp/myjob_timestamp4/measurement_date=2025-03-09/myjob_timestamp4_part001.parquet

And we want the files to be stored under:

s3://data/measurement_date=2025-03-08/myjob_timestamp1_part001.parquet
s3://data/measurement_date=2025-03-08/myjob_timestamp2_part001.parquet
s3://data/measurement_date=2025-03-08/myjob_timestamp3_part001.parquet
s3://data/measurement_date=2025-03-09/myjob_timestamp4_part001.parquet

Essentially, we need to remove the job name with timestamp information from the key. Lambda will help us to do so and we can use LambdaInvoke task for that:

 const postExecutionCleanupTask = new tasks.LambdaInvoke(this, 'PostExecutionCleanupTask', {
  lambdaFunction: postExecutionCleanupFn,
  payload: stepfunctions.TaskInput.fromObject({
    destinationBucket: props.outDataBucket.bucketName,
    sourceKey: props.tmpKeyPattern,
    destinationKey: props.outKeyPattern,
  }),
  resultPath: '$.lambdaResult',
})

And the lambda code:

const s3Client = new S3Client()

interface MoveRequest {
  destinationBucket: string
  sourceKey: string
  destinationKey: string
}

interface Response {
  statusCode: number
  body: string
}

interface S3Object {
  Key?: string
}

export const handler: Handler<MoveRequest, Response> = async (event: MoveRequest): Promise<Response> => {
  try {
    const listObjectsCommand = new ListObjectsV2Command({
      Bucket: event.destinationBucket,
      Prefix: event.sourceKey,
    })

    const listedObjects: ListObjectsV2CommandOutput = await s3Client.send(listObjectsCommand)

    if (!listedObjects.Contents || listedObjects.Contents.length === 0) {
      return {
        statusCode: 200,
        body: JSON.stringify({ message: 'No files found to move' }),
      }
    }

    const movePromises: Promise<void>[] = (listedObjects.Contents as S3Object[]).map(
      async (object: S3Object): Promise<void> => {
        const sourceKey: string | undefined = object.Key

        if (!sourceKey || !sourceKey.endsWith('.csv')) {
          return
        }
        const pathParts: string[] = sourceKey.split('/')
        logger.info(`Source key=${sourceKey}, parts=${pathParts}`)
        if (pathParts.length < 3) {
          return
        }

        const destinationParts: string[] = pathParts.slice(2)
        const destinationKey: string = destinationParts.join('/')

        const copyCommand = new CopyObjectCommand({
          Bucket: event.destinationBucket,
          CopySource: `${event.destinationBucket}/${sourceKey}`,
          Key: `${event.destinationKey}/${destinationKey}`,
        })
        await s3Client.send(copyCommand)
        const deleteCommand = new DeleteObjectCommand({
          Bucket: event.destinationBucket,
          Key: sourceKey,
        })
        await s3Client.send(deleteCommand)

        logger.info(`Moved ${sourceKey} to ${event.destinationKey}/${destinationKey}.`)
      }
    )

    await Promise.all(movePromises.filter((promise): promise is Promise<void> => promise !== undefined))

    return {
      statusCode: 200,
      body: JSON.stringify({ message: 'Files moved successfully.' }),
    }
  } catch (error: unknown) {
    const errorMessage: string = error instanceof Error ? error.message : String(error)
    logger.error('Error moving files: ', errorMessage)
    return {
      statusCode: 500,
      body: JSON.stringify({
        message: 'Error moving files',
        error: errorMessage,
      }),
    }
  }
}

The execution result is wil be available under lambdaResult path.

VerifyPostExecutionResult

The goal of this task is to verify whether the Lambda function has cleaned up and moved the result to the desired location. In case of failure, we want the job to fail. In case of success, we can mark the job as successful. We will use again Choice task.

const verifyPostExecutionResult = new stepfunctions.Choice(this, 'VerifyPostExecutionResult')
  .when(stepfunctions.Condition.numberEquals('$.lambdaResult.StatusCode', 200), setLastSuccessfulExecutionTime)
  .otherwise(
    new stepfunctions.Fail(this, 'LambdaFailed', {
      cause: 'Lambda execution failed or returned non-200 status',
    })
  )

SetLastSuccessfulExecutionTime

And the last step in our state machine. The goal of that task is to update last successful execution timestamp. We will use CallAwsService task with a call to SSM service. As a value we will use the current timestamp of state machine execution.

const setLastSuccessfulExecutionTime = new tasks.CallAwsService(this, 'SetLastSuccessfulExecutionTime', {
  service: 'ssm',
  action: 'putParameter',
  parameters: {
    Name: props.lastExecutionParameterName,
    Value: stepfunctions.JsonPath.stringAt('$.timestamps.current_timestamp'),
    Type: 'String',
    Overwrite: true,
  },
  iamResources: [`arn:aws:ssm:${stack.region}:${stack.account}:parameter${props.lastExecutionParameterName}`],
  resultPath: '$.setLastExecutionTimeResult',
})

And creating the pipeline in the stack itself:

new DatabrewPipeline(this, 'CleanMergeWeatherStationsDataPipeline', {
  outDataBucket: cleanDataBucket,
  tmpKeyPattern: 'tmp',
  outKeyPattern: 'data',
  rawDataBucket,
  rawKeyPattern: 'data/{sensor_id}/{ingestion_date}/<.*>.csv',
  lastExecutionParameterName: props.lastExecutionParameterName,
  dataset: weatherStationsDataset.name,
  datasetPathParameters: [
    {
      name: 'sensor_id',
    },
    {
      name: 'ingestion_date',
      createColumn: false,
    },
  ],
  job: cleanMergeOnDemandJob.name,
  jobRoleArn: cleanMergeOnDemandJobArn,
})

Testing

To test the solution we can upload one set of data (2025-03-08 directory for both sensors) to raw data bucket and start the Step Function. It should finish after a while and produce a single result (partitioned by measurement date):

image

And S3 output:

image

The file contains merged data from both sensors with additional columns as expected.

timestamp,week_number,temperature,humidity,pressure,pressure_label,battery_level,signal_strength,sensor_id
"2025-03-08 08:00:00.0",10,21.5,45.0,999.0,LOW,98.5,-65,sensor-001
"2025-03-08 08:00:15.0",10,21.6,45.0,999.15,LOW,98.5,-66,sensor-001
"2025-03-08 08:00:30.0",10,21.7,46.0,999.3,LOW,98.4,-65,sensor-001
"2025-03-08 08:00:45.0",10,21.8,46.0,999.45,LOW,98.4,-67,sensor-001
"2025-03-08 08:01:00.0",10,21.9,46.0,999.6,LOW,98.4,-67,sensor-001
"2025-03-08 08:01:15.0",10,22.0,46.0,999.75,LOW,98.3,-66,sensor-001
"2025-03-08 08:01:30.0",10,22.2,46.0,999.9,LOW,98.3,-66,sensor-001
"2025-03-08 08:01:45.0",10,22.3,46.0,1000.05,NORMAL,98.3,-68,sensor-001
"2025-03-08 08:02:00.0",10,22.4,47.0,1000.2,NORMAL,98.2,-68,sensor-001
"2025-03-08 08:02:15.0",10,22.6,47.0,1000.35,NORMAL,98.2,-67,sensor-001
"2025-03-08 08:02:30.0",10,22.7,47.0,1000.5,NORMAL,98.2,-67,sensor-001
"2025-03-08 08:02:45.0",10,22.8,47.0,1000.65,NORMAL,98.1,-66,sensor-001
"2025-03-08 08:03:00.0",10,22.9,47.0,1000.8,NORMAL,98.1,-65,sensor-001
"2025-03-08 08:03:15.0",10,23.0,48.0,1000.95,NORMAL,98.1,-65,sensor-001
"2025-03-08 08:03:30.0",10,23.1,48.0,1001.1,NORMAL,98.0,-64,sensor-001
"2025-03-08 08:03:45.0",10,23.2,48.0,1001.25,NORMAL,98.0,-64,sensor-001
"2025-03-08 08:04:00.0",10,23.3,48.0,1001.4,NORMAL,98.0,-65,sensor-001
"2025-03-08 08:04:15.0",10,23.4,48.0,1001.55,NORMAL,97.9,-65,sensor-001
"2025-03-08 08:04:30.0",10,23.5,48.0,1001.7,NORMAL,97.9,-66,sensor-001
"2025-03-08 08:04:45.0",10,23.5,48.0,1001.85,NORMAL,97.9,-66,sensor-001
"2025-03-08 08:05:00.0",10,23.6,49.0,1002.0,NORMAL,97.8,-67,sensor-001
"2025-03-08 09:30:00.0",10,19.8,49.0,1014.6,NORMAL,99.1,-61,sensor-002
"2025-03-08 09:30:15.0",10,19.9,49.0,1014.5,NORMAL,99.1,-61,sensor-002
"2025-03-08 09:30:30.0",10,20.0,49.0,1014.5,NORMAL,99.0,-62,sensor-002
"2025-03-08 09:30:45.0",10,20.1,49.0,1014.4,NORMAL,99.0,-62,sensor-002
"2025-03-08 09:31:00.0",10,20.2,49.0,1014.3,NORMAL,99.0,-62,sensor-002
"2025-03-08 09:31:15.0",10,20.3,50.0,1014.3,NORMAL,98.9,-63,sensor-002
"2025-03-08 09:31:30.0",10,20.4,50.0,1014.2,NORMAL,98.9,-63,sensor-002
"2025-03-08 09:31:45.0",10,20.5,50.0,1014.1,NORMAL,98.9,-63,sensor-002
"2025-03-08 09:32:00.0",10,20.6,50.0,1014.1,NORMAL,98.8,-64,sensor-002
"2025-03-08 09:32:15.0",10,20.7,50.0,1014.0,NORMAL,98.8,-64,sensor-002
"2025-03-08 09:32:30.0",10,20.8,51.0,1013.9,NORMAL,98.8,-64,sensor-002
"2025-03-08 09:32:45.0",10,20.9,51.0,1013.9,NORMAL,98.7,-65,sensor-002
"2025-03-08 09:33:00.0",10,21.0,51.0,1013.8,NORMAL,98.7,-65,sensor-002
"2025-03-08 09:33:15.0",10,21.1,51.0,1013.7,NORMAL,98.7,-65,sensor-002
"2025-03-08 09:33:30.0",10,21.2,51.0,1013.7,NORMAL,98.6,-66,sensor-002
"2025-03-08 09:33:45.0",10,21.3,52.0,1013.6,NORMAL,98.6,-66,sensor-002
"2025-03-08 09:34:00.0",10,21.4,52.0,1013.5,NORMAL,98.6,-66,sensor-002
"2025-03-08 09:34:15.0",10,21.5,52.0,1013.5,NORMAL,98.5,-67,sensor-002
"2025-03-08 09:34:30.0",10,21.6,52.0,1013.4,NORMAL,98.5,-67,sensor-002
"2025-03-08 09:34:45.0",10,21.7,52.0,1013.3,NORMAL,98.5,-67,sensor-002
"2025-03-08 09:35:00.0",10,21.8,53.0,1013.3,NORMAL,98.4,-68,sensor-002

And the last step is to verify if the solution supports incremental data. So, we can upload a second dataset and start the Step Function again. As expected, only new files were processed.

Summary

Based on my research, AWS Glue DataBrew serves well for specific use cases, particularly in simplifying data processing and transformation tasks. When combined with a few additional tools, it enables the creation of lightweight data pipelines with incremental data, without the need to write Spark jobs. This makes it an alternative for those seeking a low-code or no-code data processing solution.

All the code is available on github.