Blog Post

Integrating with AWS Personalize

About AWS Personalize

AWS Personalize is a recommendation engine provided by AWS in the form of model-as-a-service. You can define, train and host a recommender system without running any servers yourself.

In order to utilize AWS Personalize, it requires user to provide initial training data, which is expected to be within an S3 bucket. In this article, we will go over how you can use C360 data lake to provide input data for personalize. In addition, we will also look into how you can feed data back to C360 data lake and collate them for future re-training.

Supporting a Recommender System with C360

General Data Flow

blog_architecture_start_simple

Data in C360 data lake is divided into 3 zones:

C360 data lake uses S3 as its main data storage services, and under the hood, these zones are separate S3 buckets. This allows C360 to quickly interface with many services that supports reading data from S3.

For giving data to personalize, we will need a pipeline that produces the required Personalize input within downstream.

Pipelines on c360 Data Lake

The data lake is responsible for preparing the input for Personalize to import, which in this case is a single CSV file. In this article, we will assume that the input table required by Personalize has the following columns.

USER_ID,ITEM_ID,TIMESTAMP

If you have historical data dropped to upstream, you can write a C360 pipeline that cleans and transforms the data into the required schema. C360 pipeline uses AWS Athena to run queries and transform data, which stores its result in S3 as either a few compressed parquet files, or a few compressed csv files.

At the end of this pipeline, there will be an Athena table at crm_derived_restricted.t200_dashboard_interaction.

Once we have the data of the correct schema, what's left is to put it in downstream in a form that AWS Personalize can take; a single, uncompressed CSV file. To do that we can utilize an AWS Glue Job. In general, final transformation of data to a certain format, if it is not supported by Athena out-of-the-box, can be done with AWS Glue Jobs. You can write a simple python scripts to read from S3, transform the data, and write the result to downstream.

In this example, let's say that we decide on the following path for AWS Personalize to read from, and for the final step to write into.

s3://acme-c360-downstream/recommender/personalize_input.csv

Note: C360 Pipelines will store data separately between different run dates, so don't worry about overwriting your downstream data; the pipeline output can be quickly re-created for a certain date.

Now we know that we need an AWS Glue Job that does the following:

Said Glue Job script will be as follows:

# Libraries that are supported out-of-the-box are listed here:
# https://docs.aws.amazon.com/glue/latest/dg/add-job-python.html
import boto3
import gzip
import os
import tempfile

from datalake.common import get_now
from datalake.meta import DataAddress
import datalake.glue as g


@g.wrap_pipeline_glue_job
def handler():
    # 1. get glue arguments
    args = g.get_resolved_options(
        ["stage", "dataset", "table", "sink_zone", "sink_s3_key"]
    )

    stage = args["stage"]
    dataset = args["dataset"]
    table = args["table"]
    sink_zone = args["sink_zone"]
    sink_s3_key = args["sink_s3_key"]

    source_s3_bucket = "acme-c360-lake"
    sink_s3_bucket = f"acme-c360-{sink_zone}"

    # 2. getting table paths
    source_da = DataAddress(
        dataset=dataset,
        table=table,
        date=get_now(),
        stage=stage,
    )
    source_s3_prefix = source_da.s3_key(fill_date=True)

    # 3. Reading each file in the source path and
    s3 = boto3.client("s3")
    response = s3.list_objects_v2(Bucket=source_s3_bucket, Prefix=source_s3_prefix)

    object_list = [obj["Key"] for obj in response["Contents"]]

    temp_sink = tempfile.NamedTemporaryFile(delete=False)

    # 4. For each file in the S3 path, download them and append to temp file
    for obj_key in object_list:
        obj = s3.get_object(Bucket=source_s3_bucket, Key=obj_key)
        gzip_obj = gzip.GzipFile(fileobj=obj["Body"])

        for line in gzip_obj:
            temp_sink.write(line)

    temp_sink.close()  # need to be closed before boto3 can read it

    # 5. Upload the file as a single CSV
    s3.upload_file(
        Filename=temp_sink.name,
        Bucket=sink_s3_bucket,
        Key=sink_s3_key,
        Config=boto3.s3.transfer.TransferConfig(use_threads=False),
    )

    os.remove(temp_sink.name)


if __name__ == "__main__":
    handler()

Save this under external/jobs as dashboard_interaction_export.py. Any python script in this directory will be uploaded to S3 under s3://acme-c360-infra/glue_jobs/<stage>/filename.py.

There are noticeable differences between writing a typical Glue job and a c360 Glue job:

Now that we have the script, we can create a Glue Job. In serverless.yml, we can do this by including the following entry under resources.Resources:

DashboardInteractionDownstreamWriter:
    Type: AWS::Glue::Job
    Properties:
    Role: arn:aws:iam::#{AWS::AccountId}:role/c360-ExternalLakeExecutor
    Command:
        Name: pythonshell
        ScriptLocation: ${self:custom.glue_job_prefix}/dashboard_interaction_export.py
        PythonVersion: "3"
    DefaultArguments:
        --extra-py-files: ${self:custom.glue_job_prefix}/c360_lake-0.1.3rc10-py3-none-any.whl
    GlueVersion: "1.0"  # has python 3.6
    Name: ${self:service}-Job-DashboardInteractionDownstreamWriter-${self:provider.stage}

Note: This is how Serverless Framework handles Cloudformation-compatible resources. For more options, see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-glue-job-jobcommand.html

Once you have the script and the job definition in place, you can include it as part of your pipeline, with:

    # pipeline is a DataPipeline object

    pipeline.add_glue_task(
        "DashboardPersonalizeWriteDownstream",
        job_name="Job-DashboardInteractionDownstreamWriter",
        # make sure it matches the job name, without the prefix & suffix
        arguments={
            "--dataset": "crm_derived_restricted",
            "--table": "t200_dashboard_interaction",
            "--sink_zone": "downstream",
            "--sink_s3_key": "recommender/personalize_input.csv",
        },
    )

Thus, your pipeline will be writing to the downstream bucket where AWS Personalize can pick up from.

Recommendation Service

Using AWS Personalize, you may now create a personalize dataset by pointing to the S3 Path that we decided above, and proceed with creating resource for the recommender system. We suggest that you follow examples in amazon-personalize-samples repo, particularly the Building Your First Campaign and View Campaign and Interactions example notebook to get started.

Note: Permission needs to be set up such that AWS Personalize can access the bucket; in this case, we are only giving permissions to acme-c360-downstream bucket.

After that, you will have the following resources in AWS Personalize:

Feeding Back to C360 for Evaluation & Iteration

Real-Time Event Feed

With personalize, it is possible to feed it with recent event data, which allows the model to adjust its recommendation.

While AWS Personalize can be fed with recent data through its EventTracker, we also want this data to go back to our c360 data lake, so that we can run analysis on the history. Thus we want somewhere to collect the events data, and feed it to AWS Personalize while writing back to our lake at the same time.

Feeding Back to C360

Giving data back to our c360 data lake is as simple as writing your data to our upstream s3 bucket. This can be achieved with AWS Kinesis (which is a data streaming service) and AWS Kinesis Firehose (that can pull a stream to S3). Hence, we need to update our stack as follows.

blog_architecture_start_loop

Note: This is inspired by https://github.com/aws-samples/amazon-personalize-samples/tree/master/next_steps/operations/streaming_events

This has 2 main components:

This is usually a separate stack/service that feeds data into C360; that is, from C360 lake's perspective, it only sees that the upstream bucket receives data. In our example, however, we will have this as an internal service, meaning that although the stack works as a separate unit in the cloud, the code can still be within the same project. This kind of services conventionally lives under services/. In this example, it will be:

root/
    services/
        recommender/
            lambda_handler/
                personalize_consume_event.py
            serverless.yml

The three resources mentioned above can be declared in serverless.yml as follows:

service: ${self:custom.tenant}-c360-dashboard-recommender
frameworkVersion: "=1.78.1"

provider:
  name: aws
  runtime: python3.6
  region: ap-southeast-1
  stage: ${env:LAKE_DEV_ID, 'dev'}
  deploymentBucket:
    name: ${self:custom.tenant}-c360-cache${self:custom.bucket_suffix}

custom:
  tenant: acme
  pythonRequirements:
    slim: true
  bucket_suffixes:
    prod: ""
    staging: "-staging"
    other: "-dev"
  bucket_suffix: ${self:custom.bucket_suffixes.${self:provider.stage}, self:custom.bucket_suffixes.other}

functions:
  PersonalizeKinesisConsumer:
    name: ${self:service}-PersonalizeKinesisConsumer-${self:provider.stage}
    handler: external.personalize_consume_event.run
    role: arn:aws:iam::#{AWS::AccountId}:role/c360-ExternalLakeExecutor
    events:
      - stream:
          type: kinesis
          arn:
            Fn::GetAtt:
              - PersonalizeEventStream
              - Arn

resources:
  Resources:
    PersonalizeEventStream:
      Type: AWS::Kinesis::Stream
      Properties:
        Name: ${self:service}-PersonalizeStream-${self:provider.stage}
        ShardCount: 5
    PersonalizeStreamFirehoseRole:
      Type: AWS::IAM::Role
      Properties:
        RoleName: ${self:service}-PersonalizeStreamFirehoseRole-${self:provider.stage}
        AssumeRolePolicyDocument:
          Version: '2012-10-17'
          Statement:
            - Effect: Allow
              Principal:
                Service:
                  - firehose.amazonaws.com
              Action: sts:AssumeRole
        Policies:
          - PolicyName: root
            PolicyDocument:
              Version: '2012-10-17'
              Statement:
                - Effect: Allow
                  Action: s3:*
                  Resource:
                    Fn::Sub: arn:aws:s3:::${self:custom.tenant}-c360-upstream${self:custom.bucket_suffix}
    PersonalizeStreamFirehose:
      Type: AWS::KinesisFirehose::DeliveryStream
      Properties:
        DeliveryStreamName: ${self:service}-PersonalizeStreamFirehose-${self:provider.stage}
        KinesisStreamSourceConfiguration:
          KinesisStreamArn:
            Fn::GetAtt: [PersonalizeEventStream, Arn]
          RoleARN:
            Fn::GetAtt: [PersonalizeStreamFirehoseRole, Arn]
        S3DestinationConfiguration:
          BucketARN:
            Fn::Sub: arn:aws:s3:::${self:custom.tenant}-c360-upstream${self:custom.bucket_suffix}
          BufferingHints:
            IntervalInSeconds: 300
            SizeInMBs: 5
          CompressionFormat: UNCOMPRESSED
          Prefix: ${self:custom.pathkey_prefixes.${self:provider.stage}, self:custom.pathkey_prefixes.other}crm/dashboard_interaction/latest/
          RoleARN:
            Fn::GetAtt: [PersonalizeStreamFirehoseRole, Arn]

The personalize_consume_event.py is the lambda function that feeds data into personalize. It looks like the following:

import base64
import json
import boto3

TRACKING_ID = "<YOUR-EVENT-TRACKING-ID>"


def run(event, context):
    for record in event["Records"]:
        try:
            data = base64.b64decode(record["kinesis"]["data"])
            print("decoded payload", data)

            personalize_event = json.loads(data)

            put_personalize_event(
                tracking_id=TRACKING_ID,
                user_id=personalize_event["user_id"],
                session_id=personalize_event["session_id"],
                eventList=personalize_event["event_list"],
            )
        except Exception as e:
            print(
                "cannot process stream",
                dict(raw_data=record["kinesis"]["data"], error=str(e)),
            )


def put_personalize_event(tracking_id, user_id, session_id, eventList):
    personalize_events = boto3.client("personalize-events")
    personalize_events.put_events(
        trackingId=tracking_id,
        userId=user_id,
        sessionId=session_id,
        eventList=eventList,
    )

Evaluation Pipeline

Once you have data back to S3, you can do 2 things: