おひとり

できる限りひとりで楽しむための情報やプログラミング情報など。

【サーバーレスバッチ処理ハンズオン】3章 ビジネスロジックをLambdaに実装

※この記事は「AWSでサーバーレスなバッチ処理を作るハンズオン」の3章です。こちらの記事からスタートできます。

f:id:hitoridehitode:20211205160603p:plain
ビジネスロジックをLambdaに実装

ここでは、いよいよLambda関数を使って今回のバッチ処理のビジネスロジックを実装していきます。

リポジトリ

この章における変更箇所は以下のコミットで確認できます。

3章 ビジネスロジックをLambdaに実装 · SRsawaguchi/simple-serverless-batch@bf18dad · GitHub

また、このハンズオン全体のリポジトリはこちらです。

github.com

実装するLambda関数

今回開発するバッチ処理は以下でしたね。
「DynamoDBから取得したデータを使って、PDFのレポートを作成してS3バケットに保存する。」

今回は全部1つのLambda関数に実装せず、複数の関数に分けます。

関数名 内容
MakeReport DybamoDBから取得したデータをHTMLでS3にアップロードする。
HtmlToPdf S3からMarkdownファイルをダウンロードし、それを元に生成したPDFをS3にアップロードする。

HTMLファイルを作成する処理とPDFを作成する処理が分かれていることに注意してください。
以下のように実行することを想定します。

f:id:hitoridehitode:20211205154909p:plain
Lambda関数の呼び出しは次章でStep Functionsを使って実装します。

では開発を初めて行きましょう!

MakeReport

ディレクトリとrequirements.txtの作成

まず、このLambda関数用のディレクトリを作成します。
Lambda関数は./functionsディレクトリに保存するということを思い出してください。
今回はテンプレートを元に開発を始めているので、既存のディレクトリを有効活用しちゃいます。

cp -r functions/stock_buyer functions/make_report

functions/make_reportにテンプレートのファイルが一式コピーされてきます。

まずは、functions/make_report/requirements.txtを以下に書き換えます。

pystache == 0.5.4

ビジネスロジックの実装

続いて、functions/make_report/app.pyを書き換えます。
やや汚いですが、なるべく最初はなるべくファイル構成をシンプルにしたいので、1つのファイルに全て書いてしまいます。

import datetime
import os
import tempfile

from botocore.exceptions import ClientError
import boto3
import pystache


class Config:
    @staticmethod
    def or_none(val):
        is_empty = val is None or val == ""
        return None if is_empty else val

    def __init__(
        self,
        bucket_name,
        dynamodb_table_name,
        s3_endpoint=None,
        dynamodb_endpoint=None,
        minio_user=None,
        minio_password=None,
    ):
        self.bucket_name = bucket_name
        self.dynamodb_table_name = dynamodb_table_name
        self.s3_endpoint = self.or_none(s3_endpoint)
        self.dynamodb_endpoint = self.or_none(dynamodb_endpoint)
        self.minio_user = self.or_none(minio_user)
        self.minio_password = self.or_none(minio_password)


def get_target_date():
    return datetime.date(2021, 11, 28)


def upload_file(bucket_name, file_path, object_name, **kwargs):
    s3 = boto3.resource("s3", **kwargs)
    bucket = s3.Bucket(bucket_name)
    bucket.upload_file(file_path, object_name)
    return object_name


def get_message(target_date, table_name, endpoint_url=None):
    dynamodb = boto3.resource("dynamodb", endpoint_url=endpoint_url)
    table = dynamodb.Table(table_name)
    str_date = target_date.strftime("%Y/%m/%d")
    try:
        response = table.get_item(Key={"Date": str_date})
    except ClientError as e:
        print(e.response["Error"]["Message"])
        raise e
    else:
        if "Item" in response:
            return response["Item"]["Message"]
    return None


def get_object_name(target_date: datetime.date, ext=".html"):
    str_date = target_date.strftime("%Y_%m_%d")
    return f"{str_date}{ext}"


def make_report(target_date, out_dir, config: Config):
    template = """
<!DOCTYPE html>
<html>
    <head>
        <meta charset="UTF-8">
        <title>Message</title>
    </head>
    <body>
        {{{ message }}}
    </body>
</html>
"""
    msg = get_message(target_date, config.dynamodb_table_name, config.dynamodb_endpoint)
    html = pystache.render(template, {"message": msg})

    object_name = get_object_name(target_date)
    html_path = os.path.join(out_dir, os.path.basename(object_name))
    with open(html_path, mode="w") as f:
        f.write(html)

    upload_file(
        config.bucket_name,
        html_path,
        object_name,
        endpoint_url=config.s3_endpoint,
        aws_access_key_id=config.minio_user,
        aws_secret_access_key=config.minio_password,
    )
    return object_name


def lambda_handler(event, context):
    config = Config(
        bucket_name=os.environ.get("SSB_BUCKET_NAME"),
        dynamodb_table_name=os.environ.get("SSB_DYNAMODB_TABLE_NAME"),
        s3_endpoint=os.environ.get("SSB_S3_ENDPOINT"),
        dynamodb_endpoint=os.environ.get("SSB_DYNAMODB_ENDPOINT"),
        minio_user=os.environ.get("SSB_MINIO_USER"),
        minio_password=os.environ.get("SSB_MINIO_PASSWORD"),
    )

    with tempfile.TemporaryDirectory() as temp_dir:
        object_name = make_report(get_target_date(), temp_dir, config)
        return {
            "BucketName": config.bucket_name,
            "ObjectName": object_name,
        }

※LinterとしてBlackを使っています。設定は全てデフォルトです。

template.yamlの作成

続いて、./template.yamlを書き換えます。
既存の内容は全て消去し、以下のようにします。

AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: >
  simple-serverless-batch

  Sample SAM Template for simple-serverless-batch

Globals:
  Function:
    Runtime: python3.8
    Timeout: 15

Parameters:
  TableName:
    Type: String
    Description: "DynamoDB Table Name"
    Default: "Messages"
  DynamoDBEndpoint:
    Type: String
    Description: "Endpoint of DynamoDB Local. If api runs in aws, set it empty string."
    Default: ""
  S3Endpoint:
    Type: String
    Description: "Endpoint of S3 Local(minio). If api runs in aws, set it empty string."
    Default: ""
  S3BucketName:
    Type: String
    Description: "Bucket name of S3"
  MinioRootUser:
    Type: String
    Description: "Username for Minio"
  MinioRootPassword:
    Type: String
    Description: "Password for Minio."

Resources:
  ##################
  # DynamoDB Table #
  ##################
  MessagesTable:
    Type: AWS::Serverless::SimpleTable
    Properties:
      TableName: !Ref TableName
      PrimaryKey:
        Name: Date
        Type: String
      ProvisionedThroughput:
        ReadCapacityUnits: 1
        WriteCapacityUnits: 1

  ##############
  # IAM POLICY #
  ##############
  DynamoDBAccessPolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyName: "DynamoDB-student-activity"
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          -
            Effect: "Allow"
            Action:
              - "dynamodb:BatchGetItem"
              - "dynamodb:BatchWriteItem"
              - "dynamodb:BatchWriteItem"
              - "dynamodb:DescribeTable"
              - "dynamodb:GetItem"
              - "dynamodb:PutItem"
              - "dynamodb:Query"
              - "dynamodb:UpdateItem"
            Resource: !Sub "arn:aws:dynamodb:*:*:table/${TableName}"
      Roles:
        - !Ref MakeReportFunctionRole
    
  S3BucketAccessPolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyName: "S3-bucket-student-activity"
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          -
            Effect: "Allow"
            Action:
              - "s3:PutObject"
              - "s3:GetObject"
            Resource: !Sub "arn:aws:s3:::${S3BucketName}/*"
      Roles:
        - !Ref MakeReportFunctionRole

  ##############
  #  IAM ROLE  #
  ##############
  MakeReportFunctionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          -
            Effect: "Allow"
            Principal:
              Service:
                - "lambda.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

  ##############
  #  Function  #
  ##############
  MakeReportFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: functions/make_report/
      Handler: app.lambda_handler
      Environment:
        Variables:
          SSB_BUCKET_NAME: !Ref S3BucketName
          SSB_DYNAMODB_ENDPOINT: !Ref DynamoDBEndpoint
          SSB_DYNAMODB_TABLE_NAME: !Ref TableName
          SSB_MINIO_PASSWORD: !Ref MinioRootPassword
          SSB_MINIO_USER: !Ref MinioRootUser
          SSB_S3_ENDPOINT: !Ref S3Endpoint 
      Role: !GetAtt MakeReportFunctionRole.Arn

YAMLファイルについて

template.yamlも写経して書き方を覚えないと、、、と思っている方は、その考え方を捨ててください。
自分でいちから作成する場合は、なるべく公式ドキュメントに載っているExampleから要件に近いものをコピーしてください。

その際は、SAMだけでなく、Cloud Formationのドキュメントを参照することもあります。
コピーしたExampleをテンプレートとし、加筆修正していく感じで作成すると良いです。

普通のPythonなどのプログラムと違って、AWSリソースの作成には一定の時間がかかります。そのため、ちょっとしたミスでデプロイが失敗してYAMLを書き直す、、、を繰り返すと最終的にかなりの時間を消費します。
ドキュメントで提供されているサンプルコードをテンプレとして書き始めることで、小さなミスを軽減できます。

それに、何より各AWSリソースのYAMLの項目の名前や仕様を全部覚えるのは不可能です。

sam invokeでLambda関数を実行

ではさっそく、実行してみましょう。
その前に、Dockerのnetworkを作ります。
後ほど実行しますが、SAMはsam invokeというコマンドを使うことで、Lambda関数をローカルで実行できます。
その際、Lambda関数のコンテナを接続するdocker networkを指定できます。今回は、Lambda関数とdocker composeで立ち上げた各種サーバが通信できるよう、ssb-handsonというネットワークを作ります。

docker network create ssb-handson

さらに、docker-compose.yamlの末尾に以下の記述を追加し、docker compose upを実行し直してください。

networks:
  ssb-handson:
    external: true
    name: ssb-handson

以下のコマンドを実行します。

sam build && \
sam local invoke MakeReportFunction \
    --docker-network ssb-handson \
    --parameter-overrides \
      DynamoDBEndpoint=http://dynamodb:8000 \
      S3Endpoint=http://minio:9000 \
      S3BucketName=handson-bucket \
      MinioRootUser=root \
      MinioRootPassword=himitsu123

(注意) template.yamlを含む、ソースコードを変更した場合は都度sam buildを行う必要があります。
ただし、sam buildはPCのスペックによっては割と時間がかかります。その場合、以下のように関数を指定するとbuild時間を削減できます。

sam build MakeReportFunction

Minioに2021_11_28.htmlというファイルがアップロードされれば問題なく動作しています。
なお、Minioのデータはホストマシンの./docker/minio/data/に同期されているので、Minioコンソールから探すのが面倒であればホストマシンから直接ファイルを開いてください。

HtmlToPdf

HtmlToPdfはS3上にあるHTMLファイルをPDFに変換するLambda関数です。

ディレクトリとrequirements.txtの作成

テンプレートを活用します。

cp -r functions/stock_buyer functions/html_to_pdf

requirements.txtは以下のようになります。

pdfkit == 1.0.0

このpdfkitというのは、python-pdfkitのことです。
このライブラリを使ってHTMLファイルをPDFに変換するのですが、このライブラリはwkhtmltopdfに依存しています。これは実行環境にあらかじめインストールしておく必要があります。

今回は、Lambda関数にレイヤーを追加して対応します。つまり、wkhtmltopdfの実行バイナリをレイヤーとしてLambda関数に追加し、それをPythonのプログラムから使います。

Lambdaレイヤーの追加

では、追加していきます。
LambdaではレイヤーとしてZipファイルが利用できます。実際に関数が起動する際に、Zipが展開される、結果としてプログラムから参照できるということです。

wkhtmltopdfでは、Lambda関数のレイヤーとして利用するためのZipを公開しています。
以下のダウンロードサイトから、Amazon Linux向けのlambda.zipをダウンロードします。

wkhtmltopdf.org

ダウンロードしたら、それをfunctions/layer/wkhtmltopdfに配置しましょう。
以下のような構造になります。

.
├── __init__.py
├── layer
│   └── wkhtmltopdf
│       └── wkhtmltox-0.12.6-4.amazonlinux2_lambda.zip
~

続いて、template.yamlにLambdaレイヤーのリソースを追加します。
Resources配下に以下のコードを追加します。

  WkhtmltopdfLambdaLayer:
    Type: AWS::Serverless::LayerVersion
    Properties:
      Description: "Binary of wkhtmltopdf. see https://wkhtmltopdf.org/downloads.html"
      ContentUri: functions/layer/wkhtmltopdf/wkhtmltox-0.12.6-4.amazonlinux2_lambda.zip

wkhtmltopdfはバージョンアップするかもしれません。執筆当時とZipファイル名が変更になっている可能性があるため、実際にダウンロードしたZipのファイル名に読み替えてください。

なお、このZipファイルはAWSによって自動で展開され、/opt/bin/wkhtmltopdfとしてアクセスできるようになります。

ビジネスロジックの実装

では、ビジネスロジックを実装していいきます。
functions/html_to_pdf/app.pyを以下のように書き換えます。

import os
import tempfile

import boto3
import pdfkit


class Config:
    @staticmethod
    def or_none(val):
        is_empty = val is None or val == ""
        return None if is_empty else val

    def __init__(
        self,
        s3_endpoint=None,
        minio_user=None,
        minio_password=None,
        wkhtmltopdf_path=None,
    ):
        self.s3_endpoint = self.or_none(s3_endpoint)
        self.minio_user = self.or_none(minio_user)
        self.minio_password = self.or_none(minio_password)
        self.wkhtmltopdf_path = self.or_none(wkhtmltopdf_path)


def download_file(bucket_name, object_name, dist_dir, **kwargs):
    s3 = boto3.resource("s3", **kwargs)
    bucket = s3.Bucket(bucket_name)
    object_path = os.path.join(dist_dir, os.path.basename(object_name))
    bucket.download_file(object_name, object_path)
    return object_path


def upload_file(bucket_name, file_path, object_name, **kwargs):
    s3 = boto3.resource("s3", **kwargs)
    bucket = s3.Bucket(bucket_name)
    bucket.upload_file(file_path, object_name)
    return object_name


def html_to_pdf(bucket_name, object_name, out_dir, config: Config):
    html_path = download_file(
        bucket_name,
        object_name,
        out_dir,
        endpoint_url=config.s3_endpoint,
        aws_access_key_id=config.minio_user,
        aws_secret_access_key=config.minio_password,
    )
    pdf_path = os.path.join(
        out_dir, os.path.splitext(os.path.basename(object_name))[0] + ".pdf"
    )
    options = {
        "enable-local-file-access": None,
        "header-right": "Simple Serverless Batch",
        "footer-right": "[page]/[topage]",
    }
    pdfkit_config = None
    if config.wkhtmltopdf_path:
        pdfkit_config = pdfkit.configuration(wkhtmltopdf=config.wkhtmltopdf_path)
    pdfkit.from_file(html_path, pdf_path, options=options, configuration=pdfkit_config)

    pdf_object_name = upload_file(
        bucket_name,
        pdf_path,
        os.path.basename(pdf_path),
        endpoint_url=config.s3_endpoint,
        aws_access_key_id=config.minio_user,
        aws_secret_access_key=config.minio_password,
    )
    return pdf_object_name


def lambda_handler(event, context):
    config = Config(
        s3_endpoint=os.environ.get("SSB_S3_ENDPOINT"),
        minio_user=os.environ.get("SSB_MINIO_USER"),
        minio_password=os.environ.get("SSB_MINIO_PASSWORD"),
        wkhtmltopdf_path=os.environ.get("SSB_WKHTMLTOPDF_PATH"),
    )
    bucket_name = event["BucketName"]
    html_object_name = event["ObjectName"]

    with tempfile.TemporaryDirectory() as temp_dir:
        pdf_object_name = html_to_pdf(bucket_name, html_object_name, temp_dir, config)
        return {
            "BucketName": bucket_name,
            "ObjectName": pdf_object_name,
        }

フォントの追加

Lambda関数の実行環境にはフォントがインストールされていません。そのため。PDFの文字が豆腐(■)になります。
そこで、フォントを追加します。

Lambda関数の実行環境であるAmazon LinuxではFontconfigが利用できます。
今回はこの仕組みを活用してフォントを使えるようにします。

まずはディレクトリを作成しましょう。

mkdir -p functions/html_to_pdf/assets/fonts

作成したfunctions/html_to_pdf/assets/fontsに以下のファイルを作成します。

touch functions/html_to_pdf/assets/fonts/fonts.conf

そして、以下の内容を書き込みます。

<?xml version="1.0"?>
<!DOCTYPE fontconfig SYSTEM "fonts.dtd">
<fontconfig>
  <dir>/var/task/assets/fonts</dir>
  <dir>/opt/fonts</dir>
  <cachedir>/tmp/fonts-cache</cachedir>
  <config></config>
</fontconfig>

さらに以下のURLより、IPAゴシックのフォントをダウンロードし、functions/html_to_pdf/assets/fontに展開します。

IPAex フォント Ver.004.01 | 一般社団法人 文字情報技術促進協議会

functions/html_to_pdf/assetsは以下のような構造になります。

functions/html_to_pdf/assets
└── fonts
    ├── fonts.conf
    └── ipaexg00401
        ├── IPA_Font_License_Agreement_v1.0.txt
        ├── Readme_ipaexg00401.txt
        └── ipaexg.ttf

template.yamlの更新

以下の2つのリソースをtemplate.yamlに追加してください。

  HtmlToPdfFunctionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          -
            Effect: "Allow"
            Principal:
              Service:
                - "lambda.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

  HtmlToPdfFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: functions/html_to_pdf/
      Handler: app.lambda_handler
      Layers:
        - !Ref WkhtmltopdfLambdaLayer
      Environment:
        Variables:
          FONTCONFIG_PATH: /var/task/assets/fonts
          SSB_MINIO_PASSWORD: !Ref MinioRootPassword
          SSB_MINIO_USER: !Ref MinioRootUser
          SSB_S3_ENDPOINT: !Ref S3Endpoint 
          SSB_WKHTMLTOPDF_PATH: /opt/bin/wkhtmltopdf
      Role: !GetAtt HtmlToPdfFunctionRole.Arn

そして、IAM PolicyであるS3BucketAccessPolicyRolesのリストにHtmlToPdfFunctionRoleを追加してください。

  S3BucketAccessPolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyName: "S3-bucket-student-activity"
      PolicyDocument:
        Version: "2012-10-17"
        Statement:
          -
            Effect: "Allow"
            Action:
              - "s3:PutObject"
              - "s3:GetObject"
            Resource: !Sub "arn:aws:s3:::${S3BucketName}/*"
      Roles:
        - !Ref MakeReportFunctionRole
        - !Ref HtmlToPdfFunctionRole # <----------- 追加

sam invokeでLambda関数を実行

さっそくLambda関数を実行していきます。

その前に、イベントを作成します。

touch html_to_pdf_sample_event.json

html_to_pdf_sample_event.jsonに以下の内容を書き込んでおきます。

{ "BucketName": "handson-bucket", "ObjectName": "2021_11_28.html" }

では、作成したイベントを使ってLambda関数を実行しましょう。

sam build && \
sam local invoke HtmlToPdfFunction \
    --docker-network ssb-handson \
    --event html_to_pdf_sample_event.json \
    --parameter-overrides \
      S3Endpoint=http://minio:9000 \
      S3BucketName=handson-bucket \
      MinioRootUser=root \
      MinioRootPassword=himitsu123

実行後に、Minio2021_11_28.pdfがアップロードされていればOKです。

次のステップ

次は、Step FunctionsとEvent Bridgeを使ってこの一連の処理の呼び出し、およびトリガーを実装していきましょう。
いよいよAWSにデプロイします。

www.ohitori.fun