※この記事は「AWSでサーバーレスなバッチ処理を作るハンズオン」の3章です。こちらの記事からスタートできます。
ここでは、いよいよLambda関数を使って今回のバッチ処理のビジネスロジックを実装していきます。
リポジトリ
この章における変更箇所は以下のコミットで確認できます。
3章 ビジネスロジックをLambdaに実装 · SRsawaguchi/simple-serverless-batch@bf18dad · GitHub
また、このハンズオン全体のリポジトリはこちらです。
実装するLambda関数
今回開発するバッチ処理は以下でしたね。
「DynamoDBから取得したデータを使って、PDFのレポートを作成してS3バケットに保存する。」
今回は全部1つのLambda関数に実装せず、複数の関数に分けます。
関数名 | 内容 |
---|---|
MakeReport | DybamoDBから取得したデータをHTMLでS3にアップロードする。 |
HtmlToPdf | S3からMarkdownファイルをダウンロードし、それを元に生成したPDFをS3にアップロードする。 |
HTMLファイルを作成する処理とPDFを作成する処理が分かれていることに注意してください。
以下のように実行することを想定します。
では開発を初めて行きましょう!
MakeReport
ディレクトリとrequirements.txtの作成
まず、このLambda関数用のディレクトリを作成します。
Lambda関数は./functions
ディレクトリに保存するということを思い出してください。
今回はテンプレートを元に開発を始めているので、既存のディレクトリを有効活用しちゃいます。
cp -r functions/stock_buyer functions/make_report
functions/make_report
にテンプレートのファイルが一式コピーされてきます。
まずは、functions/make_report/requirements.txt
を以下に書き換えます。
pystache == 0.5.4
ビジネスロジックの実装
続いて、functions/make_report/app.py
を書き換えます。
やや汚いですが、なるべく最初はなるべくファイル構成をシンプルにしたいので、1つのファイルに全て書いてしまいます。
import datetime import os import tempfile from botocore.exceptions import ClientError import boto3 import pystache class Config: @staticmethod def or_none(val): is_empty = val is None or val == "" return None if is_empty else val def __init__( self, bucket_name, dynamodb_table_name, s3_endpoint=None, dynamodb_endpoint=None, minio_user=None, minio_password=None, ): self.bucket_name = bucket_name self.dynamodb_table_name = dynamodb_table_name self.s3_endpoint = self.or_none(s3_endpoint) self.dynamodb_endpoint = self.or_none(dynamodb_endpoint) self.minio_user = self.or_none(minio_user) self.minio_password = self.or_none(minio_password) def get_target_date(): return datetime.date(2021, 11, 28) def upload_file(bucket_name, file_path, object_name, **kwargs): s3 = boto3.resource("s3", **kwargs) bucket = s3.Bucket(bucket_name) bucket.upload_file(file_path, object_name) return object_name def get_message(target_date, table_name, endpoint_url=None): dynamodb = boto3.resource("dynamodb", endpoint_url=endpoint_url) table = dynamodb.Table(table_name) str_date = target_date.strftime("%Y/%m/%d") try: response = table.get_item(Key={"Date": str_date}) except ClientError as e: print(e.response["Error"]["Message"]) raise e else: if "Item" in response: return response["Item"]["Message"] return None def get_object_name(target_date: datetime.date, ext=".html"): str_date = target_date.strftime("%Y_%m_%d") return f"{str_date}{ext}" def make_report(target_date, out_dir, config: Config): template = """ <!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>Message</title> </head> <body> {{{ message }}} </body> </html> """ msg = get_message(target_date, config.dynamodb_table_name, config.dynamodb_endpoint) html = pystache.render(template, {"message": msg}) object_name = get_object_name(target_date) html_path = os.path.join(out_dir, os.path.basename(object_name)) with open(html_path, mode="w") as f: f.write(html) upload_file( config.bucket_name, html_path, object_name, endpoint_url=config.s3_endpoint, aws_access_key_id=config.minio_user, aws_secret_access_key=config.minio_password, ) return object_name def lambda_handler(event, context): config = Config( bucket_name=os.environ.get("SSB_BUCKET_NAME"), dynamodb_table_name=os.environ.get("SSB_DYNAMODB_TABLE_NAME"), s3_endpoint=os.environ.get("SSB_S3_ENDPOINT"), dynamodb_endpoint=os.environ.get("SSB_DYNAMODB_ENDPOINT"), minio_user=os.environ.get("SSB_MINIO_USER"), minio_password=os.environ.get("SSB_MINIO_PASSWORD"), ) with tempfile.TemporaryDirectory() as temp_dir: object_name = make_report(get_target_date(), temp_dir, config) return { "BucketName": config.bucket_name, "ObjectName": object_name, }
※LinterとしてBlackを使っています。設定は全てデフォルトです。
template.yamlの作成
続いて、./template.yaml
を書き換えます。
既存の内容は全て消去し、以下のようにします。
AWSTemplateFormatVersion: "2010-09-09" Transform: AWS::Serverless-2016-10-31 Description: > simple-serverless-batch Sample SAM Template for simple-serverless-batch Globals: Function: Runtime: python3.8 Timeout: 15 Parameters: TableName: Type: String Description: "DynamoDB Table Name" Default: "Messages" DynamoDBEndpoint: Type: String Description: "Endpoint of DynamoDB Local. If api runs in aws, set it empty string." Default: "" S3Endpoint: Type: String Description: "Endpoint of S3 Local(minio). If api runs in aws, set it empty string." Default: "" S3BucketName: Type: String Description: "Bucket name of S3" MinioRootUser: Type: String Description: "Username for Minio" MinioRootPassword: Type: String Description: "Password for Minio." Resources: ################## # DynamoDB Table # ################## MessagesTable: Type: AWS::Serverless::SimpleTable Properties: TableName: !Ref TableName PrimaryKey: Name: Date Type: String ProvisionedThroughput: ReadCapacityUnits: 1 WriteCapacityUnits: 1 ############## # IAM POLICY # ############## DynamoDBAccessPolicy: Type: AWS::IAM::Policy Properties: PolicyName: "DynamoDB-student-activity" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "dynamodb:BatchGetItem" - "dynamodb:BatchWriteItem" - "dynamodb:BatchWriteItem" - "dynamodb:DescribeTable" - "dynamodb:GetItem" - "dynamodb:PutItem" - "dynamodb:Query" - "dynamodb:UpdateItem" Resource: !Sub "arn:aws:dynamodb:*:*:table/${TableName}" Roles: - !Ref MakeReportFunctionRole S3BucketAccessPolicy: Type: AWS::IAM::Policy Properties: PolicyName: "S3-bucket-student-activity" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "s3:PutObject" - "s3:GetObject" Resource: !Sub "arn:aws:s3:::${S3BucketName}/*" Roles: - !Ref MakeReportFunctionRole ############## # IAM ROLE # ############## MakeReportFunctionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole ############## # Function # ############## MakeReportFunction: Type: AWS::Serverless::Function Properties: CodeUri: functions/make_report/ Handler: app.lambda_handler Environment: Variables: SSB_BUCKET_NAME: !Ref S3BucketName SSB_DYNAMODB_ENDPOINT: !Ref DynamoDBEndpoint SSB_DYNAMODB_TABLE_NAME: !Ref TableName SSB_MINIO_PASSWORD: !Ref MinioRootPassword SSB_MINIO_USER: !Ref MinioRootUser SSB_S3_ENDPOINT: !Ref S3Endpoint Role: !GetAtt MakeReportFunctionRole.Arn
YAMLファイルについて
template.yaml
も写経して書き方を覚えないと、、、と思っている方は、その考え方を捨ててください。
自分でいちから作成する場合は、なるべく公式ドキュメントに載っているExampleから要件に近いものをコピーしてください。
その際は、SAMだけでなく、Cloud Formationのドキュメントを参照することもあります。
コピーしたExampleをテンプレートとし、加筆修正していく感じで作成すると良いです。
普通のPythonなどのプログラムと違って、AWSリソースの作成には一定の時間がかかります。そのため、ちょっとしたミスでデプロイが失敗してYAMLを書き直す、、、を繰り返すと最終的にかなりの時間を消費します。
ドキュメントで提供されているサンプルコードをテンプレとして書き始めることで、小さなミスを軽減できます。
それに、何より各AWSリソースのYAMLの項目の名前や仕様を全部覚えるのは不可能です。
sam invokeでLambda関数を実行
ではさっそく、実行してみましょう。
その前に、Dockerのnetworkを作ります。
後ほど実行しますが、SAMはsam invoke
というコマンドを使うことで、Lambda関数をローカルで実行できます。
その際、Lambda関数のコンテナを接続するdocker networkを指定できます。今回は、Lambda関数とdocker compose
で立ち上げた各種サーバが通信できるよう、ssb-handson
というネットワークを作ります。
docker network create ssb-handson
さらに、docker-compose.yaml
の末尾に以下の記述を追加し、docker compose up
を実行し直してください。
networks: ssb-handson: external: true name: ssb-handson
以下のコマンドを実行します。
sam build && \ sam local invoke MakeReportFunction \ --docker-network ssb-handson \ --parameter-overrides \ DynamoDBEndpoint=http://dynamodb:8000 \ S3Endpoint=http://minio:9000 \ S3BucketName=handson-bucket \ MinioRootUser=root \ MinioRootPassword=himitsu123
(注意) template.yaml
を含む、ソースコードを変更した場合は都度sam build
を行う必要があります。
ただし、sam build
はPCのスペックによっては割と時間がかかります。その場合、以下のように関数を指定するとbuild時間を削減できます。
sam build MakeReportFunction
Minioに2021_11_28.html
というファイルがアップロードされれば問題なく動作しています。
なお、Minioのデータはホストマシンの./docker/minio/data/
に同期されているので、Minioコンソールから探すのが面倒であればホストマシンから直接ファイルを開いてください。
HtmlToPdf
HtmlToPdf
はS3上にあるHTMLファイルをPDFに変換するLambda関数です。
ディレクトリとrequirements.txtの作成
テンプレートを活用します。
cp -r functions/stock_buyer functions/html_to_pdf
requirements.txt
は以下のようになります。
pdfkit == 1.0.0
このpdfkit
というのは、python-pdfkitのことです。
このライブラリを使ってHTMLファイルをPDFに変換するのですが、このライブラリはwkhtmltopdfに依存しています。これは実行環境にあらかじめインストールしておく必要があります。
今回は、Lambda関数にレイヤーを追加して対応します。つまり、wkhtmltopdf
の実行バイナリをレイヤーとしてLambda関数に追加し、それをPythonのプログラムから使います。
Lambdaレイヤーの追加
では、追加していきます。
LambdaではレイヤーとしてZipファイルが利用できます。実際に関数が起動する際に、Zipが展開される、結果としてプログラムから参照できるということです。
wkhtmltopdf
では、Lambda関数のレイヤーとして利用するためのZipを公開しています。
以下のダウンロードサイトから、Amazon Linux向けのlambda.zip
をダウンロードします。
ダウンロードしたら、それをfunctions/layer/wkhtmltopdf
に配置しましょう。
以下のような構造になります。
. ├── __init__.py ├── layer │ └── wkhtmltopdf │ └── wkhtmltox-0.12.6-4.amazonlinux2_lambda.zip ~
続いて、template.yaml
にLambdaレイヤーのリソースを追加します。
Resources
配下に以下のコードを追加します。
WkhtmltopdfLambdaLayer: Type: AWS::Serverless::LayerVersion Properties: Description: "Binary of wkhtmltopdf. see https://wkhtmltopdf.org/downloads.html" ContentUri: functions/layer/wkhtmltopdf/wkhtmltox-0.12.6-4.amazonlinux2_lambda.zip
wkhtmltopdf
はバージョンアップするかもしれません。執筆当時とZipファイル名が変更になっている可能性があるため、実際にダウンロードしたZipのファイル名に読み替えてください。
なお、このZipファイルはAWSによって自動で展開され、/opt/bin/wkhtmltopdf
としてアクセスできるようになります。
ビジネスロジックの実装
では、ビジネスロジックを実装していいきます。
functions/html_to_pdf/app.py
を以下のように書き換えます。
import os import tempfile import boto3 import pdfkit class Config: @staticmethod def or_none(val): is_empty = val is None or val == "" return None if is_empty else val def __init__( self, s3_endpoint=None, minio_user=None, minio_password=None, wkhtmltopdf_path=None, ): self.s3_endpoint = self.or_none(s3_endpoint) self.minio_user = self.or_none(minio_user) self.minio_password = self.or_none(minio_password) self.wkhtmltopdf_path = self.or_none(wkhtmltopdf_path) def download_file(bucket_name, object_name, dist_dir, **kwargs): s3 = boto3.resource("s3", **kwargs) bucket = s3.Bucket(bucket_name) object_path = os.path.join(dist_dir, os.path.basename(object_name)) bucket.download_file(object_name, object_path) return object_path def upload_file(bucket_name, file_path, object_name, **kwargs): s3 = boto3.resource("s3", **kwargs) bucket = s3.Bucket(bucket_name) bucket.upload_file(file_path, object_name) return object_name def html_to_pdf(bucket_name, object_name, out_dir, config: Config): html_path = download_file( bucket_name, object_name, out_dir, endpoint_url=config.s3_endpoint, aws_access_key_id=config.minio_user, aws_secret_access_key=config.minio_password, ) pdf_path = os.path.join( out_dir, os.path.splitext(os.path.basename(object_name))[0] + ".pdf" ) options = { "enable-local-file-access": None, "header-right": "Simple Serverless Batch", "footer-right": "[page]/[topage]", } pdfkit_config = None if config.wkhtmltopdf_path: pdfkit_config = pdfkit.configuration(wkhtmltopdf=config.wkhtmltopdf_path) pdfkit.from_file(html_path, pdf_path, options=options, configuration=pdfkit_config) pdf_object_name = upload_file( bucket_name, pdf_path, os.path.basename(pdf_path), endpoint_url=config.s3_endpoint, aws_access_key_id=config.minio_user, aws_secret_access_key=config.minio_password, ) return pdf_object_name def lambda_handler(event, context): config = Config( s3_endpoint=os.environ.get("SSB_S3_ENDPOINT"), minio_user=os.environ.get("SSB_MINIO_USER"), minio_password=os.environ.get("SSB_MINIO_PASSWORD"), wkhtmltopdf_path=os.environ.get("SSB_WKHTMLTOPDF_PATH"), ) bucket_name = event["BucketName"] html_object_name = event["ObjectName"] with tempfile.TemporaryDirectory() as temp_dir: pdf_object_name = html_to_pdf(bucket_name, html_object_name, temp_dir, config) return { "BucketName": bucket_name, "ObjectName": pdf_object_name, }
フォントの追加
Lambda関数の実行環境にはフォントがインストールされていません。そのため。PDFの文字が豆腐(■)になります。
そこで、フォントを追加します。
Lambda関数の実行環境であるAmazon LinuxではFontconfigが利用できます。
今回はこの仕組みを活用してフォントを使えるようにします。
まずはディレクトリを作成しましょう。
mkdir -p functions/html_to_pdf/assets/fonts
作成したfunctions/html_to_pdf/assets/fonts
に以下のファイルを作成します。
touch functions/html_to_pdf/assets/fonts/fonts.conf
そして、以下の内容を書き込みます。
<?xml version="1.0"?> <!DOCTYPE fontconfig SYSTEM "fonts.dtd"> <fontconfig> <dir>/var/task/assets/fonts</dir> <dir>/opt/fonts</dir> <cachedir>/tmp/fonts-cache</cachedir> <config></config> </fontconfig>
さらに以下のURLより、IPAゴシックのフォントをダウンロードし、functions/html_to_pdf/assets/font
に展開します。
IPAex フォント Ver.004.01 | 一般社団法人 文字情報技術促進協議会
functions/html_to_pdf/assets
は以下のような構造になります。
functions/html_to_pdf/assets └── fonts ├── fonts.conf └── ipaexg00401 ├── IPA_Font_License_Agreement_v1.0.txt ├── Readme_ipaexg00401.txt └── ipaexg.ttf
template.yamlの更新
以下の2つのリソースをtemplate.yaml
に追加してください。
HtmlToPdfFunctionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole HtmlToPdfFunction: Type: AWS::Serverless::Function Properties: CodeUri: functions/html_to_pdf/ Handler: app.lambda_handler Layers: - !Ref WkhtmltopdfLambdaLayer Environment: Variables: FONTCONFIG_PATH: /var/task/assets/fonts SSB_MINIO_PASSWORD: !Ref MinioRootPassword SSB_MINIO_USER: !Ref MinioRootUser SSB_S3_ENDPOINT: !Ref S3Endpoint SSB_WKHTMLTOPDF_PATH: /opt/bin/wkhtmltopdf Role: !GetAtt HtmlToPdfFunctionRole.Arn
そして、IAM PolicyであるS3BucketAccessPolicy
のRoles
のリストにHtmlToPdfFunctionRole
を追加してください。
S3BucketAccessPolicy: Type: AWS::IAM::Policy Properties: PolicyName: "S3-bucket-student-activity" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "s3:PutObject" - "s3:GetObject" Resource: !Sub "arn:aws:s3:::${S3BucketName}/*" Roles: - !Ref MakeReportFunctionRole - !Ref HtmlToPdfFunctionRole # <----------- 追加
sam invokeでLambda関数を実行
さっそくLambda関数を実行していきます。
その前に、イベントを作成します。
touch html_to_pdf_sample_event.json
html_to_pdf_sample_event.json
に以下の内容を書き込んでおきます。
{ "BucketName": "handson-bucket", "ObjectName": "2021_11_28.html" }
では、作成したイベントを使ってLambda関数を実行しましょう。
sam build && \ sam local invoke HtmlToPdfFunction \ --docker-network ssb-handson \ --event html_to_pdf_sample_event.json \ --parameter-overrides \ S3Endpoint=http://minio:9000 \ S3BucketName=handson-bucket \ MinioRootUser=root \ MinioRootPassword=himitsu123
実行後に、Minio
に2021_11_28.pdf
がアップロードされていればOKです。
次のステップ
次は、Step FunctionsとEvent Bridgeを使ってこの一連の処理の呼び出し、およびトリガーを実装していきましょう。
いよいよAWSにデプロイします。