AWS Personalize is a recommendation engine provided by AWS in the form of model-as-a-service. You can define, train and host a recommender system without running any servers yourself.
In order to utilize AWS Personalize, it requires user to provide initial training data, which is expected to be within an S3 bucket. In this article, we will go over how you can use C360 data lake to provide input data for personalize. In addition, we will also look into how you can feed data back to C360 data lake and collate them for future re-training.
Data in C360 data lake is divided into 3 zones:
upstream
- where raw source data is storedlake
- where data lives in a more processed formdownstream
- where processed data is stored, ready to be consumedC360 data lake uses S3 as its main data storage services, and under the hood, these zones are separate S3 buckets. This allows C360 to quickly interface with many services that supports reading data from S3.
For giving data to personalize, we will need a pipeline that produces the required
Personalize input within downstream
.
The data lake is responsible for preparing the input for Personalize to import, which in this case is a single CSV file. In this article, we will assume that the input table required by Personalize has the following columns.
USER_ID,ITEM_ID,TIMESTAMP
If you have historical data dropped to upstream
, you can write a C360 pipeline
that cleans and transforms the data into the required schema. C360 pipeline uses
AWS Athena to run queries and transform data, which stores its result in S3 as
either a few compressed parquet
files, or a few compressed csv
files.
At the end of this pipeline, there will be an Athena table at
crm_derived_restricted.t200_dashboard_interaction
.
Once we have the data of the correct schema, what's left is to put it in downstream
in a form that AWS Personalize can take; a single, uncompressed CSV file. To do
that we can utilize an AWS Glue Job. In general, final transformation of data to
a certain format, if it is not supported by Athena out-of-the-box, can be done with
AWS Glue Jobs. You can write a simple python scripts to read from S3, transform
the data, and write the result to downstream
.
In this example, let's say that we decide on the following path for AWS Personalize to read from, and for the final step to write into.
s3://acme-c360-downstream/recommender/personalize_input.csv
Note: C360 Pipelines will store data separately between different run dates, so don't worry about overwriting your downstream data; the pipeline output can be quickly re-created for a certain date.
Now we know that we need an AWS Glue Job that does the following:
crm_derived_restricted.t200_dashboard_interaction
(output of the pipeline). This will be a multi-file of gzipped CSV files.Said Glue Job script will be as follows:
# Libraries that are supported out-of-the-box are listed here:
# https://docs.aws.amazon.com/glue/latest/dg/add-job-python.html
import boto3
import gzip
import os
import tempfile
from datalake.common import get_now
from datalake.meta import DataAddress
import datalake.glue as g
@g.wrap_pipeline_glue_job
def handler():
# 1. get glue arguments
args = g.get_resolved_options(
["stage", "dataset", "table", "sink_zone", "sink_s3_key"]
)
stage = args["stage"]
dataset = args["dataset"]
table = args["table"]
sink_zone = args["sink_zone"]
sink_s3_key = args["sink_s3_key"]
source_s3_bucket = "acme-c360-lake"
sink_s3_bucket = f"acme-c360-{sink_zone}"
# 2. getting table paths
source_da = DataAddress(
dataset=dataset,
table=table,
date=get_now(),
stage=stage,
)
source_s3_prefix = source_da.s3_key(fill_date=True)
# 3. Reading each file in the source path and
s3 = boto3.client("s3")
response = s3.list_objects_v2(Bucket=source_s3_bucket, Prefix=source_s3_prefix)
object_list = [obj["Key"] for obj in response["Contents"]]
temp_sink = tempfile.NamedTemporaryFile(delete=False)
# 4. For each file in the S3 path, download them and append to temp file
for obj_key in object_list:
obj = s3.get_object(Bucket=source_s3_bucket, Key=obj_key)
gzip_obj = gzip.GzipFile(fileobj=obj["Body"])
for line in gzip_obj:
temp_sink.write(line)
temp_sink.close() # need to be closed before boto3 can read it
# 5. Upload the file as a single CSV
s3.upload_file(
Filename=temp_sink.name,
Bucket=sink_s3_bucket,
Key=sink_s3_key,
Config=boto3.s3.transfer.TransferConfig(use_threads=False),
)
os.remove(temp_sink.name)
if __name__ == "__main__":
handler()
Save this under external/jobs
as dashboard_interaction_export.py
. Any python
script in this directory will be uploaded to S3 under
s3://acme-c360-infra/glue_jobs/<stage>/filename.py
.
There are noticeable differences between writing a typical Glue job and a c360
Glue job:
c360
are wrapped in @wrap_pipeline_glue_job
from datalake.glue
.
This makes it such that the job can be called from within a StepFunction state
machine, and communicate back to it. It also allows normal datalake
modules to
work properly by setting the appropriate datalake context.awsglue.utils.getResolvedOptions()
to retrieve arguments, you
instead use datalake.glue.get_resolved_options()
. This bypasses the need to
import awsglue
on top of the script, so that one can easily test the Glue job
script by mocking datalake.glue.get_resolved_options
.Now that we have the script, we can create a Glue Job. In serverless.yml
, we
can do this by including the following entry under resources.Resources
:
DashboardInteractionDownstreamWriter:
Type: AWS::Glue::Job
Properties:
Role: arn:aws:iam::#{AWS::AccountId}:role/c360-ExternalLakeExecutor
Command:
Name: pythonshell
ScriptLocation: ${self:custom.glue_job_prefix}/dashboard_interaction_export.py
PythonVersion: "3"
DefaultArguments:
--extra-py-files: ${self:custom.glue_job_prefix}/c360_lake-0.1.3rc10-py3-none-any.whl
GlueVersion: "1.0" # has python 3.6
Name: ${self:service}-Job-DashboardInteractionDownstreamWriter-${self:provider.stage}
Note: This is how Serverless Framework handles Cloudformation-compatible resources. For more options, see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-glue-job-jobcommand.html
Once you have the script and the job definition in place, you can include it as part of your pipeline, with:
# pipeline is a DataPipeline object
pipeline.add_glue_task(
"DashboardPersonalizeWriteDownstream",
job_name="Job-DashboardInteractionDownstreamWriter",
# make sure it matches the job name, without the prefix & suffix
arguments={
"--dataset": "crm_derived_restricted",
"--table": "t200_dashboard_interaction",
"--sink_zone": "downstream",
"--sink_s3_key": "recommender/personalize_input.csv",
},
)
Thus, your pipeline will be writing to the downstream bucket where AWS Personalize can pick up from.
Using AWS Personalize, you may now create a personalize dataset by pointing to the S3 Path that we decided above, and proceed with creating resource for the recommender system. We suggest that you follow examples in amazon-personalize-samples repo, particularly the Building Your First Campaign and View Campaign and Interactions example notebook to get started.
Note: Permission needs to be set up such that AWS Personalize can access the bucket; in this case, we are only giving permissions to
acme-c360-downstream
bucket.
After that, you will have the following resources in AWS Personalize:
dataset
(from the S3 path we defined)solution
and a solution version
, which is the trained modelcampaign
which hosts your trained model, and can accept requests for
recommendation.event tracker
with a unique tracking ID for new events.With personalize, it is possible to feed it with recent event data, which allows the model to adjust its recommendation.
While AWS Personalize can be fed with recent data through its EventTracker
, we
also want this data to go back to our c360 data lake, so that we can run analysis
on the history. Thus we want somewhere to collect the events data, and feed it
to AWS Personalize while writing back to our lake at the same time.
Giving data back to our c360 data lake is as simple as writing your data to our
upstream
s3 bucket. This can be achieved with AWS Kinesis (which is a data
streaming service) and AWS Kinesis Firehose (that can pull a stream to S3).
Hence, we need to update our stack as follows.
Note: This is inspired by https://github.com/aws-samples/amazon-personalize-samples/tree/master/next_steps/operations/streaming_events
This has 2 main components:
This is usually a separate stack/service that feeds data into C360; that is, from
C360 lake's perspective, it only sees that the upstream bucket receives data. In
our example, however, we will have this as an internal service, meaning that
although the stack works as a separate unit in the cloud, the code can still be
within the same project. This kind of services conventionally lives under
services/
. In this example, it will be:
root/
services/
recommender/
lambda_handler/
personalize_consume_event.py
serverless.yml
The three resources mentioned above can be declared in serverless.yml
as follows:
service: ${self:custom.tenant}-c360-dashboard-recommender
frameworkVersion: "=1.78.1"
provider:
name: aws
runtime: python3.6
region: ap-southeast-1
stage: ${env:LAKE_DEV_ID, 'dev'}
deploymentBucket:
name: ${self:custom.tenant}-c360-cache${self:custom.bucket_suffix}
custom:
tenant: acme
pythonRequirements:
slim: true
bucket_suffixes:
prod: ""
staging: "-staging"
other: "-dev"
bucket_suffix: ${self:custom.bucket_suffixes.${self:provider.stage}, self:custom.bucket_suffixes.other}
functions:
PersonalizeKinesisConsumer:
name: ${self:service}-PersonalizeKinesisConsumer-${self:provider.stage}
handler: external.personalize_consume_event.run
role: arn:aws:iam::#{AWS::AccountId}:role/c360-ExternalLakeExecutor
events:
- stream:
type: kinesis
arn:
Fn::GetAtt:
- PersonalizeEventStream
- Arn
resources:
Resources:
PersonalizeEventStream:
Type: AWS::Kinesis::Stream
Properties:
Name: ${self:service}-PersonalizeStream-${self:provider.stage}
ShardCount: 5
PersonalizeStreamFirehoseRole:
Type: AWS::IAM::Role
Properties:
RoleName: ${self:service}-PersonalizeStreamFirehoseRole-${self:provider.stage}
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- firehose.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: root
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: s3:*
Resource:
Fn::Sub: arn:aws:s3:::${self:custom.tenant}-c360-upstream${self:custom.bucket_suffix}
PersonalizeStreamFirehose:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamName: ${self:service}-PersonalizeStreamFirehose-${self:provider.stage}
KinesisStreamSourceConfiguration:
KinesisStreamArn:
Fn::GetAtt: [PersonalizeEventStream, Arn]
RoleARN:
Fn::GetAtt: [PersonalizeStreamFirehoseRole, Arn]
S3DestinationConfiguration:
BucketARN:
Fn::Sub: arn:aws:s3:::${self:custom.tenant}-c360-upstream${self:custom.bucket_suffix}
BufferingHints:
IntervalInSeconds: 300
SizeInMBs: 5
CompressionFormat: UNCOMPRESSED
Prefix: ${self:custom.pathkey_prefixes.${self:provider.stage}, self:custom.pathkey_prefixes.other}crm/dashboard_interaction/latest/
RoleARN:
Fn::GetAtt: [PersonalizeStreamFirehoseRole, Arn]
The personalize_consume_event.py
is the lambda function that feeds data into
personalize. It looks like the following:
import base64
import json
import boto3
TRACKING_ID = "<YOUR-EVENT-TRACKING-ID>"
def run(event, context):
for record in event["Records"]:
try:
data = base64.b64decode(record["kinesis"]["data"])
print("decoded payload", data)
personalize_event = json.loads(data)
put_personalize_event(
tracking_id=TRACKING_ID,
user_id=personalize_event["user_id"],
session_id=personalize_event["session_id"],
eventList=personalize_event["event_list"],
)
except Exception as e:
print(
"cannot process stream",
dict(raw_data=record["kinesis"]["data"], error=str(e)),
)
def put_personalize_event(tracking_id, user_id, session_id, eventList):
personalize_events = boto3.client("personalize-events")
personalize_events.put_events(
trackingId=tracking_id,
userId=user_id,
sessionId=session_id,
eventList=eventList,
)
Once you have data back to S3, you can do 2 things: