作者 | Mike Moritz
来源 | Medium
编辑 | 代码医生团队
机器学习训练工作通常是时间和资源密集型的,因此将这一过程整合到实时自动化工作流程中可能会面临挑战。
尽管可以在Lambda上运行标准的Python TensorFlow库,但很可能许多应用程序很快会遇到部署包大小和/或执行时间的限制,或者需要其他计算选项。
本文将逐步介绍如何使数据管理和预测保持无服务器状态,但将训练工作加载到临时EC2实例。这种实例创建模式将基于为在云中运行具有成本效益的超参数优化而开发的一种模式。
将预测功能保留在Lambda中意味着由于加载TensorFlow而仍然可能存在大小限制。为了减轻这种情况,所有Lambda函数都将为Node.js编写,这也将允许使用TensorFlow.js而不是标准的Python库。
https://www.tensorflow.org/js
TensorFlow.js提供浏览器版本和Node版本,后者包含C ++绑定以提高性能。Node版本似乎是显而易见的选择,但是它可以解压缩为690MB,这使其立即不适合Lambda。鉴于我们不会在Lambda函数中进行训练,因此性能下降可以接受预测,因此将使用解压缩为55MB的浏览器版本。
对于基础的机器学习模型,将尝试基于以下输入参数来预测一个人的舒适度:
实际模型将使用通过TensorFlow的Keras API构建的简单(非优化)神经网络。
对于数据存储,我们将在DynamoDB中创建两个表:
环境设定
初始化
由于项目将与Node Lambda文件和Python EC2文件混合在一起,因此,将在文件夹结构中将它们分开,如下所示。还将利用Serverless框架,该框架将保留在顶层,而Node和Python部分将在各自的文件夹中初始化。
├── LambdaAutoTraining
│ ├── js
│ │ ├── ...
│ ├── py
│ │ ├── ...
首先,请安装Serverless并使用Node模板初始化一个新项目。应显示样板处理程序(handler.js)和配置文件(serverless.yml)。
$ npm install -g serverless
$ mkdir -p LambdaAutoTraining/{js,py}
$ cd LambdaAutoTraining
$ serverless create --template aws-nodejs
节点设置
导航到该js文件夹,初始化一个新的Node项目,然后安装Tensorflow.js(仅浏览器版本!)。
$ cd js
$ npm init
...follow prompts
$ npm install @tensorflow/tfjs
接下来,使用体系结构图作为指南,创建必要的JavaScript文件,这些文件将映射到最终的Lambda函数。
$ touch test.js upload.js train.js infer.js s3proxy.js
最后,将样板代码从复制handler.js到每个文件中,然后删除handler.js。
Python设置
导航到该py文件夹并创建一个新的虚拟环境。为了创建训练方案,将使用Jupyter笔记本,并且还将需要该tensorflowjs模块,以便可以将保存的模型转换为TensorFlow.js可以理解的格式。
$ cd ../py
$ pyenv virtualenv 3.7.3 autotraining
$ pyenv activate autotraining
$ pip install tensorflow tensorflowjs jupyter
$ pip freeze > requirements.txt
仅需要在本节中创建Jupyter笔记本文件和Dockerfile。Python文件将在Docker构建过程中创建。
$ touch train.ipynb Dockerfile
项目结构现在应如下所示:
├── LambdaAutoTraining
│ ├── serverless.yml
│ ├── js
│ │ ├── test.js
│ │ ├── upload.js
│ │ ├── train.js
│ │ ├── infer.js
│ │ ├── s3proxy.js
│ │ ├── package.json
│ │ ├── package_lock.json
| │ ├── node_modules
| │ │ ├── ...
│ ├── py
│ │ ├── requirements.txt
│ │ ├── train.ipynb
│ │ ├── Dockerfile
无服务器设置
该serverless.yml文件是项目的主要配置。首先删除文件中的所有样板文本(如果需要,可以稍后参考文档中的所有各种选项),然后开始构建提供程序部分。
与大多数AWSless Serverless示例的主要区别在于,将定义自己的IAM角色。通常role,该部分将替换为iamRoleStatements允许无服务器与其自己的整体IAM角色合并的自定义策略的部分。但是,需要将EC2包括为受信任的实体,而不能作为的一部分使用iamRoleStatements。稍后将在资源部分中对此进行构建。
环境部分使可以访问Lambda函数中与部署相关的变量。IAM_ROLE将需要创建EC2实例策略,并且API_URL两者都将使用它test.js并向infer.js的API Gateway端点进行调用。
service: lambdaautotraining
provider:
name: aws
runtime: nodejs10.x
stage: dev
region: us-east-1
role: IamRole
environment:
SERVICE: ${self:service}
REGION: ${self:provider.region}
BUCKET: ${self:service}-${opt:stage, self:provider.stage}
STAGE: ${opt:stage, self:provider.stage}
DYNAMODB_TABLE_MODELS: ${self:service}-models-${opt:stage, self:provider.stage}
DYNAMODB_TABLE_DATA: ${self:service}-data-${opt:stage, self:provider.stage}
FUNCTION_PREFIX: ${self:service}-${opt:stage, self:provider.stage}-
IAM_ROLE: ${self:service}-${opt:stage, self:provider.stage}-${self:provider.region}-managedrole
API_URL:
Fn::Join:
- ""
- - "https://"
- Ref: "ApiGatewayRestApi"
- ".execute-api.${self:provider.region}.amazonaws.com/${self:provider.stage}"
package:
exclude:
- py/**
# ...
接下来,使用图和创建的文件作为指南来定义每个功能。为简单起见,每个处理程序函数名称和API端点将与文件名相同。
upload,infer和s3proxy将通过API网关调用,因此将发生http事件。因为s3proxy将使用路径参数来定义所请求key的文件,并将其作为S3存储桶中的文件夹。
对于该train功能,将使用DynamoDB流触发器,该触发器将包含在资源部分中。当至少有一个新事件并且满足以下任一限制时,将触发此事件:
由于train将主要负责启动EC2实例,因此还将定义一些其他特定的环境变量。在此示例中,Docker映像将存储在AWS Docker注册表(ECR)中,但是也可以使用其他映像。
最后,test将仅用于手动触发,因此没有关联的事件。
# ...
functions:
upload:
handler: js/upload.upload
events:
- http:
path: upload
method: post
train:
handler: js/train.train
environment:
AMI_ID: ami-0f812849f5bc97db5
KEY_NAME: ec2testing
INSTANCE_TYPE: t2.micro
SPOT_DURATION: 60
VALID_HRS: 4
ECR_ID: 530583866435
ECR_REPO: lambda-auto-training/lambda-auto-training-${opt:stage, self:provider.stage}
events:
- stream:
type: dynamodb
arn:
Fn::GetAtt: [DataDynamoDbTable, StreamArn]
batchSize: 100
batchWindow: 300
infer:
handler: js/infer.infer
events:
- http:
path: infer
method: post
test:
handler: js/test.test
s3proxy:
handler: js/s3proxy.s3proxy
events:
- http:
path: s3proxy/{key}/{filename}
method: get
# ...
接下来,创建S3存储桶和两个DynamoDB表(在此阶段配置的吞吐量有限)。请注意,该data表还包含StreamSpecification将用于触发train功能的。
# ...
resources:
Resources:
Bucket:
Type: AWS::S3::Bucket
Properties:
BucketName: ${self:provider.environment.BUCKET}
ModelsDynamoDbTable:
Type: 'AWS::DynamoDB::Table'
DeletionPolicy: Retain
Properties:
AttributeDefinitions:
-
AttributeName: created
AttributeType: N
KeySchema:
-
AttributeName: created
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
TableName: ${self:provider.environment.DYNAMODB_TABLE_MODELS}
DataDynamoDbTable:
Type: 'AWS::DynamoDB::Table'
DeletionPolicy: Retain
Properties:
AttributeDefinitions:
-
AttributeName: created
AttributeType: N
KeySchema:
-
AttributeName: created
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
TableName: ${self:provider.environment.DYNAMODB_TABLE_DATA}
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
# ...
创建的最终资源是自定义IAM角色,该功能将由所有功能使用,并且无服务器文档提供了一个很好的起点模板。为了将角色从Lambda转移到EC2,需要做两件事:
https://serverless.com/framework/docs/providers/aws/guide/iam#one-custom-iam-role-for-all-functions
在本Policies节中,将首先复制默认的无服务器策略以进行日志记录和S3部署存储桶(通常会自动创建这些策略)。接下来,将为之前定义的S3存储桶和DynamoDB表添加自定义语句。请注意,在创建自定义策略时,不会自动创建DynamoDB流策略,因此需要显式定义它。
此外,将添加创建EC2实例所需的策略:
安全说明:在部署到生产环境之前,应将这些策略的范围缩小到仅所需的资源
# ...
IamRole:
Type: AWS::IAM::Role
Properties:
Path: /
RoleName: ${self:service}-${opt:stage, self:provider.stage}-${self:provider.region}-managedrole
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
- ec2.amazonaws.com # added for transfer in train.js
Action: sts:AssumeRole
Policies:
- PolicyName: ${self:service}-${opt:stage, self:provider.stage}-${self:provider.region}-managedpolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
# Begin default Serverless policies
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource:
- 'Fn::Join':
- ':'
-
- 'arn:aws:logs'
- Ref: 'AWS::Region'
- Ref: 'AWS::AccountId'
- 'log-group:/aws/lambda/*:*:*'
- Effect: "Allow"
Action:
- "s3:PutObject"
Resource:
Fn::Join:
- ""
- - "arn:aws:s3:::"
- "Ref" : "ServerlessDeploymentBucket"
# Begin custom policies
- Effect: Allow
Action:
- s3:*
Resource:
Fn::Join:
- ""
- - "arn:aws:s3:::"
- ${self:provider.environment.BUCKET}
- "/*"
- Effect: Allow
Action:
- dynamodb:Query
- dynamodb:Scan
- dynamodb:GetItem
- dynamodb:PutItem
- dynamodb:UpdateItem
- dynamodb:DeleteItem
- dynamodb:GetRecords
- dynamodb:GetShardIterator
- dynamodb:DescribeStream
- dynamodb:ListStreams
- dynamodb:BatchWriteItem
Resource:
- "arn:aws:dynamodb:${opt:region, self:provider.region}:*:table/${self:provider.environment.DYNAMODB_TABLE_MODELS}"
- "arn:aws:dynamodb:${opt:region, self:provider.region}:*:table/${self:provider.environment.DYNAMODB_TABLE_DATA}"
- { Fn::GetAtt: [ DataDynamoDbTable, Arn ] }
# For custom role stream policy must be explicitly defined
- Effect: Allow
Action:
- dynamodb:GetRecords
- dynamodb:GetShardIterator
- dynamodb:DescribeStream
- dynamodb:ListStreams
Resource:
- "arn:aws:dynamodb:${opt:region, self:provider.region}:*:table/${self:provider.environment.DYNAMODB_TABLE_DATA}/stream/*"
- Effect: Allow
Action:
- ec2:RunInstances
- ec2:TerminateInstances
- cloudwatch:PutMetricAlarm
- cloudwatch:DescribeAlarms
- cloudwatch:EnableAlarmActions
Resource: "*"
# Allow for transferring role to EC2
- Effect: Allow
Action:
- iam:PassRole
- iam:GetInstanceProfile
- iam:CreateInstanceProfile
- iam:AddRoleToInstanceProfile
Resource: "*"
# Used only by EC2 when role is transfered
- Effect: Allow
Action:
- ecr:GetAuthorizationToken
- ecr:GetDownloadUrlForLayer
- ecr:BatchGetImage
- ecr:BatchCheckLayerAvailability
Resource:
- "*"
由于已经为每个功能添加了样板代码,因此现在可以部署并测试所有配置是否正确。
$ serverless deploy --stage dev
...
$ curl -X POST "https://<api_id>.execute-api.<region>.amazonaws.com/dev/upload"
Go Serverless v1.0! Your function executed successfully!
现在准备构建该应用程序!
Lambda:upload.js
该upload函数将新标记数据的数组作为输入,并将其存储在DynamoDB表中。然后,此更新将启动流触发器以启动该train功能。
在upload.js第一个导入和设置的AWS SDK。由于此功能是从HTTP事件触发的,因此将读取该body字段,然后构造一个代表单个DynamoDB插入项的对象数组。请注意,即使字段具有不同的类型(例如,数字和字符串分别为“ N”或“ S”),实际值也需要作为字符串传递。
'use strict';
// Load the SDK for JavaScript
const AWS = require('aws-sdk');
AWS.config.update({region: process.env.REGION});
const ddb = new AWS.DynamoDB({apiVersion: '2012-08-10'});
module.exports.upload = (event, context, callback) => {
var data = event.body
if (typeof data == 'string') {
data = JSON.parse(data)
}
// construct put request items
var requestItems = [];
data.forEach(item => {
requestItems.push({
PutRequest: {
Item: {
created: { "N": item.created.toString() },
temp: { "N": item.temp.toString() },
rh: { "N": item.rh.toString() },
wind: { "N": item.wind.toString() },
clo: { "N": item.clo.toString() },
label: { "S": item.label.toString() },
score: { "N": item.score.toString() },
}
}
});
});
if (requestItems.length == 0) {
callback(null, {
statusCode: 200,
headers: {'Content-Type': 'text/plain'},
body: 'No records created since request body length was zero',
});
return;
}
// ....
如果有新的项目来写,将建立一个新的对象,然后使用batchWriteItem从DynamoDB AWS SDK写的新项目。batchWriteItem比一系列putItem请求更有效,而且也是原子的。
https://docs.aws.amazon.com/sdk-for-javascript/v2/developer-guide/dynamodb-example-table-read-write-batch.html
// ...
var params = {
RequestItems: {
[process.env.DYNAMODB_TABLE_DATA]: requestItems
}
};
console.log('params>>>', params);
ddb.batchWriteItem(params, function(err, data) {
if (err) {
console.log("Error", err);
callback(Error(err), {
statusCode: 500,
headers: {'Content-Type': 'text/plain'},
body: 'Error encountered during record creation',
});
} else {
console.log("Success", data);
callback(null, {
statusCode: 200,
headers: {'Content-Type': 'text/plain'},
body: requestItems.length.toString() + ' records successfully created',
});
}
});
};
现在已经构建了upload功能,还可以test.js扩展生成随机数据来测试工作流程并填充数据库。有关详细信息,请参见Github文件。
https://github.com/mikepm35/LambdaAutoTraining/blob/master/js/test.js
重新部署到该dev阶段并测试端点。此时,开始用数据填充DynamoDB非常有价值,可以通过手动调用该test.js函数来完成。
$ severless deploy --stage dev
...
$ curl -X POST "https://<api_id>.execute-api.<region>.amazonaws.com/dev/upload" -d '[{"created": 1570323309012, "temp": 75, "rh": 60, "wind": 0.6, "clo": 1.0, "label": "ok", "score": -1}]'
1 records created successfully
尽管该train.js函数尚未构建,但一旦达到批处理窗口或批处理大小,仍应看到它已被调用。
EC2:train.py
完成上传新数据的功能后,现在将重点转移到Python训练部分。将焦点转移到此处而不是完成JavaScript Lambda函数的动机是,train如果完成了EC2 / ECR集成,则可以更轻松地验证功能,否则将无法验证启动脚本是否正常运行。
要开始使用TensorFlow模型,请打开Jupyter Notebook(虚拟环境应该已经激活)。
$ cd py
$ jupyter notebook
打开train.ipynb,先前已将其创建为空文件。希望将关键字段作为环境参数传递给Docker容器,但是为了便于测试,将提供这些值。接下来,创建代表两个DynamoDB表的变量。
对于输入数据,将对DynamoDB数据表执行扫描。在LastEvaluatedKey将存在如果结果被分页,当响应是大于1MB恰好。
DynamoDB返回一个Decimal数据类型,因此将遍历数据集并转换为浮点以及对标签数据进行一次热编码。最后,此列表将转换为numpy数组,以输入到TensorFlow模型中。
为了创建模型,将使用TensorFlow的Keras API,更具体地说是使用顺序模型,该模型允许构建神经网络的各个层。本文的重点不是超参数优化,因此将使用非常简单的配置。重要的是要注意,必须定义输入形状,以便以后导入TensorFlow.js。
模型完成后,将使用tfjs模块中的转换器将其直接保存为TensorFlow.js可以导入的形式。然后将这些文件上传到S3并以当前纪元为键将其上传到新文件夹中。还将维护“最新”文件夹,以定义客户端应使用哪种模型进行预测。最后,每个模型拟合的结果将存储model在DynamoDB 中的表中。
由于data应该填充该表,因此现在可以在本地运行此笔记本并验证功能。
在模型开发完成之后,现在将开始准备Docker映像,首先是Dockerfile,该文件提供了构建映像的说明。
打开Dockerfile进行编辑,并如下所示进行更新,其用途如下:
FROM python:3.7
RUN echo $(python3 --version)
RUN useradd -ms /bin/bash lambdaautotraining
WORKDIR /home/lambdaautotraining
RUN apt-get update -y
COPY train.ipynb requirements.txt ./
RUN pip install -r requirements.txt
RUN chown -R lambdaautotraining:lambdaautotraining ./
USER lambdaautotraining
RUN jupyter nbconvert --to script train.ipynb
CMD ["python3","-u","train.py"]
接下来,在本地构建Docker容器,用ECR URI标记,登录到ECR,然后推送到存储库。
$ docker build -t lambda-auto-training-dev .
...
$ docker tag \
lambda-auto-training-dev:latest \
<ecr_id>.dkr.ecr.<region>.amazonaws.com/lambda-auto-training/lambda-auto-training-dev:latest
$ $(aws ecr get-login --no-include-email --region <region>)
...
$ docker push \
<ecr_id>.dkr.ecr.<region>.amazonaws.com/lambda-auto-training/lambda-auto-training-dev:latest
可以手动启动EC2实例并执行命令以运行该映像,但是将创建触发Lambda函数并对其进行测试。
Lambda:train.js
trainLambda函数的主要目的是对新一批带标签的数据做出反应,然后启动一个新的EC2实例,以完全执行训练工作流程。与upload使用回调样式处理程序的函数不同,这里将使用async / await模式。
在此函数中定义的第一个变量是初始化脚本,该脚本将传递到EC2实例以进行启动。作为单独的shell脚本值得测试,但为简单起见,此处仅以字符串形式显示。该脚本的主要职责如下:
请注意,该run命令具有一系列环境属性,这些属性是通过replace语句定义的。这些将在的训练Python脚本中用于与DynamoDB和S3进行交互。
最后,该字符串需要根据EC2要求进行base64编码。
'use strict';
const child_process = require("child_process");
const AWS = require('aws-sdk');
AWS.config.update({region: process.env.REGION});
const iam = new AWS.IAM();
const ec2 = new AWS.EC2({apiVersion: '2016-11-15'});
const cw = new AWS.CloudWatch({apiVersion: '2010-08-01'});
module.exports.train = async event => {
// Create init script
var userData = `#!/bin/bash
sudo yum install -y unzip
curl "https://s3.amazonaws.com/aws-cli/awscli-bundle.zip" -o "awscli-bundle.zip"
unzip awscli-bundle.zip
sudo ./awscli-bundle/install -i /usr/local/aws -b /usr/local/bin/aws
$(aws ecr get-login --no-include-email --region <region>)
docker pull <ecr_id>.dkr.ecr.<region>.amazonaws.com/<repo>:latest
docker run --name keras-remote-training \
-e "TABLE_MODELS=<table_models>" -e "TABLE_DATA=<table_data>" \
-e "BUCKET=<bucket>" -e "REGION=<region>"\
-d --rm <ecr_id>.dkr.ecr.<region>.amazonaws.com/<repo>:latest
`;
userData = userData.replace(/<region>/g, process.env.REGION);
userData = userData.replace(/<ecr_id>/g, process.env.ECR_ID);
userData = userData.replace(/<repo>/g, process.env.ECR_REPO);
userData = userData.replace(/<table_models>/g, process.env.DYNAMODB_TABLE_MODELS);
userData = userData.replace(/<table_data>/g, process.env.DYNAMODB_TABLE_DATA);
userData = userData.replace(/<bucket>/g, process.env.BUCKET);
console.log('Final userData>>> ', userData);
var userDataBuff = new Buffer(userData);
var userDataBase64 = userDataBuff.toString('base64');
console.log('Base64 encoded userData>>>', userDataBase64);
// ...
接下来,检索实例配置文件,该配置文件定义了EC2实例将使用的IAM角色。每个需要阻止的调用都使用带有await关键字的promise表单。
// ...
// Get instance profile
var profileParams = {
InstanceProfileName: process.env.SERVICE+'-'+process.env.STAGE,
}
var profileData = null;
try {
profileData = await iam.getInstanceProfile(profileParams).promise();
} catch (err) {
if (err.statusCode == 404) {
console.log('Instance profile does not exist, creating');
profileData = await iam.createInstanceProfile(profileParams).promise();
child_process.execSync("sleep 10"); // alllows new profile to propogate
profileParams.RoleName = process.env.IAM_ROLE;
var roleData = await iam.addRoleToInstanceProfile(profileParams).promise();
console.log('Added role to instance profile');
} else {
console.error('Unexpected error getting instance profile: ', err);
return {
statusCode: 500,
body: JSON.stringify({error: "Error getting instance profile"}),
};
}
}
// ...
有了实例配置文件,将为竞价型实例定义完整的EC2参数集。另一种选择是分别创建一个模板并直接启动它。还将在关闭时终止实例,这里的另一项优化是根据需要停止/启动持久实例。
// ...
// Set EC2 instance parameters
var instanceParams = {
ImageId: process.env.AMI_ID,
InstanceType: process.env.INSTANCE_TYPE,
KeyName: process.env.KEY_NAME,
MinCount: 1,
MaxCount: 1,
UserData: userDataBase64,
InstanceInitiatedShutdownBehavior: 'terminate',
InstanceMarketOptions: {
MarketType: 'spot',
SpotOptions: {
BlockDurationMinutes: process.env.SPOT_DURATION,
InstanceInterruptionBehavior: 'terminate',
SpotInstanceType: 'one-time',
}
},
IamInstanceProfile: {
Arn: profileData.InstanceProfile.Arn,
},
};
// ...
现在,准备开始创建EC2。成功后,将创建并启用警报,当CPU降至某个阈值以下时,该警报将自动终止实例,将其用作完成训练的代理。
// ...
// Create EC2 instance
try {
var instanceData = await ec2.runInstances(instanceParams).promise();
var instanceId = instanceData.Instances[0].InstanceId;
console.log("Created instance", instanceId);
// Create alarm to auto-terminate
var alarmParams = {
AlarmName: process.env.SERVICE+'_CPU_Utilization_'+instanceId,
ComparisonOperator: 'LessThanThreshold',
EvaluationPeriods: 2,
MetricName: 'CPUUtilization',
Namespace: 'AWS/EC2',
Period: 60*5,
Statistic: 'Average',
Threshold: 5.0,
ActionsEnabled: true,
AlarmActions: ['arn:aws:automate:'+process.env.REGION+':ec2:terminate'],
AlarmDescription: 'Termination alarm for CPU below 5%',
Dimensions: [
{
Name: 'InstanceId',
Value: instanceId
},
],
Unit: 'Percent'
};
var alarmData = await cw.putMetricAlarm(alarmParams).promise();
console.log('Alarm created: ', alarmData);
// Enable action on alarm
var paramsEnableAlarmAction = {
AlarmNames: [alarmParams.AlarmName]
};
var actionData = await cw.enableAlarmActions(paramsEnableAlarmAction).promise();
console.log("Alarm action enabled", actionData);
return {
statusCode: 200,
body: JSON.stringify({result: "Instance created"}),
};
} catch (err) {
console.error('Error creating instance: ', err);
return {
statusCode: 500,
body: JSON.stringify({error: "Error creating instance"}),
};
}
};
现在,可以使用新功能更新开发环境。
$ cd ../js
$ serverless deploy --stage dev
在已经确认可以触发train.js工作后,将使用控制台测试训练工作流程。在AWS中,打开Lambda,DynamoDB,S3和EC2的服务页面并执行以下操作:
Lambda:infer.js
完成完整的训练工作流程后,现在就可以构建预测/推断部分。主要目的infer是下载模型,加载到TensorFlow.js中,然后根据HTTP触发器提供给它的一组输入进行预测。该函数期望输入为对象数组,其键代表所需的模型输入字段。
TensorFlow.js的浏览器版本使用fetch的不是Node.js中的标准版本。为了解决这个问题,将安装node-fetch,并在fetch全局范围内使用它。
$ npm install node-fetch
接下来,必须下载模型。将再次需要解决以下事实:正在使用浏览器版本,该版本不希望访问标准本地文件系统。可以从tfjs-node项目中提取必要的模块,但是在本示例中,将利用中的直接HTTP下载选项loadLayersModel。
但是,由于S3存储桶尚未对外开放,因此需要确定如何允许这种访问。对于使用签名URL的 HTTP访问S3 是一个合理的选择,但是在下载步骤TensorFlow实际上在做两件事:
https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html#getSignedUrl-property
要解决此问题,将使用单独的代理,该代理将接收每个请求并将其重定向到适当的签名url。更新s3proxy以支持此操作,如下所示:
'use strict';
const AWS = require('aws-sdk');
AWS.config.update({region: process.env.REGION});
const s3 = new AWS.S3();
module.exports.s3proxy = async event => {
var filename = 'models/latest/model.json';
console.log('Final filename: ', filename);
var signedUrl = s3.getSignedUrl('getObject', {
Bucket: process.env.BUCKET,
Key: filename,
Expires: 100 // seconds
});
console.log('Signed url: ', signedUrl);
return {
statusCode: 302,
headers: {
Location: signedUrl
}
};
};
回到infer函数,加载模型,将输入转换为2D张量并运行预测。arraySync会将结果转换为标准浮点数,并将每组输入转换为跨输出维度的一组预测。通过找到最大值,此预测将转换为简单的标签映射,然后在新的JSON对象中返回。
'use strict';
const tf = require('@tensorflow/tfjs');
// See https://github.com/tensorflow/tfjs/issues/2029
const nodeFetch = require('node-fetch');
global.fetch = nodeFetch;
const AWS = require('aws-sdk');
AWS.config.update({region: process.env.REGION});
const ddb = new AWS.DynamoDB({apiVersion: '2012-08-10'});
module.exports.infer = async event => {
// Expecting to be array of json objects with keys matching inputs
var data = event.body
if (typeof data == 'string') {
data = JSON.parse(data)
}
var input = [];
for (var entry in data) {
input.push([entry.temp, entry.clo, entry.rh, entry.wind]);
}
// Download and load the model
var url = process.env.API_URL + '/s3proxy/latest/model.json';
console.log('s3proxy url: ', url);
const model = await tf.loadLayersModel(url);
// Run inference with predict()
var predictResult = model.predict(tf.tensor2d(input));
var resultArray = predictResult.arraySync();
// Construct output
var output = [];
for (var entry in resultArray) {
var maxIndex = entry.indexOf(Math.max(...entry));
var simpleResult = null;
switch (maxIndex) {
case 0:
simpleResult = 'cold';
break;
case 1:
simpleResult = 'ok';
break;
case 2:
simpleResult = 'warm';
break;
}
output.push({
simpleResult: simpleResult,
rawResult: entry,
});
}
return {
statusCode: 200,
body: JSON.stringify(output),
};
};
测试整个工作流程
如果创建了该test功能,则可以设置cron作业以在定义的时间间隔执行,以模拟实际流量。为此,需要将CloudWatch事件触发器(默认禁用)添加到我们的serverless.yml配置中:
test:
handler: js/test.test
events:
- schedule:
rate: rate(2 minutes)
enabled: false
手动启用触发器可能会有些混乱,因为Lambda UI将其显示为“已启用”,但是实际上您需要转到CloudWatch启用底层事件:
问题在于,AWS::Events::RuleLambda UI中既有一个(可以启用/禁用),也有一个触发器(可以启用/禁用)。Lambda UI显示触发状态,该状态已启用。但是,实际上无法通过CloudFormation来解决这一问题。该AWS::Events::Rule设置为禁用,这是设定CloudFormation。如果触发器或规则被禁用,它将不会触发您的Lambda。
对于预测方面,可以像以前一样手动进行测试,或者扩展测试功能策略以包括推断。
准备就绪后,现在可以部署到“生产”阶段。对于Docker映像,将向现有映像添加一个新标签,并将其推送到生产存储库中。
$ serverless deploy --stage prod
...
$ docker tag \
lambda-auto-training-dev:latest \
<ecr_id>.dkr.ecr.<region>.amazonaws.com/lambda-auto-training/lambda-auto-training-prod:latest
...
$ docker push \
<ecr_id>.dkr.ecr.<region>.amazonaws.com/lambda-auto-training/lambda-auto-training-prod:latest
最后的想法
鉴于这是一个原型,在部署到实际生产环境之前,应考虑许多方面:
可以在GitHub上查看所有代码:
https://github.com/mikepm35/LambdaAutoTraining