忘れたくない予定のための通知

Posted on | 1034 words | ~5 mins

という巷によくありそうなモノをつくった。
kidsを保育園に預けていて、定期的に園からのおたよりで「mm/ddまでにxxxをもってきてほしい/つくってきてほしい」といった要求が届く。「歯磨きするから歯ブラシをビニールケースに入れて」「箸を練習していきます。箸を箸袋に入れてそれを昼食袋に」「プール開きに伴いxxx, yyy, zzzを入れたプールバッグを斯々然々..」といったもの。
弊家は家庭ナレッジをすべてScrapboxに集約していて、園からのおたよりもまた例によってScrapboxにアーカイブされている。というところで、記録は問題なくできている一方で、記憶がトぶことがしばしばあり、dueまでに指定のモノを用意できない/持参し忘れるということがしばしばあった。
カレンダに予定を入れておいてもなにかと忘れ、都度Slackのリマインダ通知を設定するのも作業的に面倒くさくてやってられない…と業を煮やした末、「対応を忘れないようにするための通知」を自動化しようとおもった。 (タスク管理といえばタスク管理だから専用のツールで管理という選択肢はあるのだけれども、家庭で扱うツールはなるたけ増やしたくないという事情もあった。ツール増やした先でコメント機能なんかがあるとコンテキストやナレッジが分散しがちでそれは避けたいなど)
それ系のGoogleカレンダ+Slackなインテグレーションはおあつらえのものがきっとあるとおもって探したが、ほしかったものはなさそうだったので自前でつくることにした。

つくったもの

AWS Lambda+EventBridgeベースのスケジューラ

  • EventBridgeでLambdaを定期実行するスケジューラを定義
  • LambdaはGoogleCalendar APIを叩いて向こう1週間分の指定カレンダの予定を取得
  • 1日後/3日後/1週間後にあたるイベントを選別し、指定Slack channelに通知

Slack通知サンプル

  • Slackアプリのアイコン画像は以下のような呪文でStable Diffusionで生成したらかわいいのが出来た。生成AI便利。
    Cute robot holding a calendar in its hand and reading it out in Monet style
    

Lambda(Pythonランタイム)本体のコードとしては以下のようなもの

import datetime
import json
import logging
import os
import re
from collections import defaultdict
from dataclasses import dataclass
from enum import Enum
from typing import Optional

import boto3
import slack_sdk
import slack_sdk.errors
from google.oauth2 import service_account
from googleapiclient.discovery import build as build_google_service

logging.basicConfig()
logger = logging.getLogger()


SCOPES = ["https://www.googleapis.com/auth/calendar.readonly"]
GOOGLE_CALENDAR_ID = os.environ["GOOGLE_CALENDAR_ID"]
PARAM_KEY_SERVICE_ACCOUNT_JSON = os.environ["PARAM_KEY_SERVICE_ACCOUNT_JSON"]
PARAM_KEY_SLACK_API_TOKEN = os.environ["PARAM_KEY_SLACK_API_TOKEN"]
SLACK_CHANNEL_ID = os.environ["SLACK_CHANNEL_ID"]

REX_EVENT_DESCRIPTION_LINK = re.compile(r'href="([^"]*)"')


class ISO8601Formatter(logging.Formatter):
    def formatTime(self, record, datefmt=None):
        tz_jst = datetime.timezone(datetime.timedelta(hours=+9), "JST")
        dt = datetime.datetime.fromtimestamp(record.created, tz_jst)
        return dt.isoformat()


@dataclass
class CalendarEvent:
    date: str
    summary: str
    description: str

    @property
    def link_url(self) -> Optional[str]:
        match = REX_EVENT_DESCRIPTION_LINK.search(self.description)
        if match:
            return match.group(1)
        return None


class RemainingDays(Enum):
    ONE_DAY = 1
    THREE_DAYS = 3
    ONE_WEEK = 7

    @property
    def date(self):
        target_date = datetime.datetime.now() + datetime.timedelta(days=self.value)
        return target_date.strftime("%Y-%m-%d")


class GoogleCalendarClient:
    def __init__(self):
        service_account_json = get_ssm_param(PARAM_KEY_SERVICE_ACCOUNT_JSON)
        service_account_info = json.loads(service_account_json)
        creds = service_account.Credentials.from_service_account_info(
            service_account_info, scopes=SCOPES
        )
        self.service = build_google_service("calendar", "v3", credentials=creds)

    def get_events(self):
        now = datetime.datetime.utcnow().isoformat() + "Z"  # 'Z' indicates UTC time
        events_result = (
            self.service.events()
            .list(
                calendarId=GOOGLE_CALENDAR_ID,
                timeMin=now,
                maxResults=10,
                singleEvents=True,
                orderBy="startTime",
            )
            .execute()
        )
        events = events_result.get("items", [])

        if not events:
            logger.info("No upcoming events found.")
        for event in events:
            date = event["start"].get("dateTime", event["start"].get("date"))
            summary = event.get("summary", "")
            description = event.get("description", "")
            yield CalendarEvent(date, summary, description)


class SlackClient:
    def __init__(self):
        api_token = get_ssm_param(PARAM_KEY_SLACK_API_TOKEN)
        self.client = slack_sdk.WebClient(token=api_token)

    def post_schedules(self, channel_id: str, events: list[CalendarEvent]):
        try:
            message = self._build_schedule_reminder_message(events)
            logger.info(f"✉️  Posting message:{message}")
            self.client.chat_postMessage(channel=channel_id, text=message)
        except slack_sdk.errors.SlackApiError as e:
            err = e.response.get("error", "")
            logger.error(f"❗error:{err}")

    def _build_schedule_reminder_message(self, events: list[CalendarEvent]) -> str:
        d = {d_r.date: d_r for d_r in RemainingDays}
        remaining_days_event_map = defaultdict(list)
        for event in events:
            if event.date in d:
                d_r = d[event.date]
                remaining_days_event_map[d_r.value].append(event)
        lines: list[str] = []
        lines.append(self._build_schedule_reminder_title_text())
        for k in sorted(remaining_days_event_map.keys()):
            events = remaining_days_event_map[k]
            lines.append(self._build_schedule_reminder_section_text(k))
            for event in events:
                lines.append(
                    self._build_list_item_text(
                        self._build_schedule_reminder_event_text(event)
                    )
                )
            lines.append("\n")
        return "\n".join(lines)

    def _build_schedule_reminder_title_text(self) -> str:
        return "Hey <!channel>, here is the upcoming schedules🗓️"

    def _build_schedule_reminder_section_text(self, days_remaining: int) -> str:
        match days_remaining:
            case 1:
                title = "tomorrow"
            case 7:
                title = "in a week"
            case _:
                title = f"in {days_remaining} days"
        padding = "=" * 20
        return self._build_bold_text(padding + f"{title}" + padding)

    def _build_schedule_reminder_event_text(self, event: CalendarEvent) -> str:
        date_text = f"{event.date}({get_weekday(event.date)}):"
        if event.link_url:
            return (
                f"{date_text}" f"{self._build_link_text(event.summary, event.link_url)}"
            )
        else:
            return f"{date_text}" f"{event.summary} {event.description}"

    def _build_link_text(self, text: str, url: str) -> str:
        return f"<{url}|{text}>"

    def _build_bold_text(self, text) -> str:
        return f"*{text}*"

    def _build_list_item_text(self, text) -> str:
        return f"• {text}"


def get_ssm_param(param_key):
    ssm = boto3.client("ssm")
    response = ssm.get_parameters(
        Names=[
            param_key,
        ],
        WithDecryption=True,
    )
    return response["Parameters"][0]["Value"]


def get_weekday(date_string: str):
    date = datetime.datetime.strptime(date_string, "%Y-%m-%d")
    weekdays = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
    return weekdays[date.weekday()]


def init_logging():
    logformatter = ISO8601Formatter(
        (
            "[%(levelname)s] @%(asctime)s "
            "- %(name)s.%(filename)s#%(funcName)s():L%(lineno)s "
            "- %(message)s"
        ),
        "%Y-%m-%dT%H:%M:%S%z",
    )

    for handler in logger.handlers:
        handler.setFormatter(logformatter)
        handler.setLevel(os.environ.get("LOG_LEVEL", "INFO"))
        logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))
        logger.addHandler(handler)


def main():
    init_logging()
    calendar_client = GoogleCalendarClient()

    events = list(calendar_client.get_events())
    for event in events:
        logger.info(event)
    slack_client = SlackClient()
    slack_client.post_schedules(channel_id=SLACK_CHANNEL_ID, events=events)


def lambda_handler(event, context):
    main()


動かすだけならLambdaだしロジック書いておしまいだが、折角なので実務運用でも耐えうるであろうワークフローにしてみた(=「IaCちゃんとやるとしたらどうするか」についての検討)


Lambdaアプリケーションのデプロイと実行

Pythonランタイム(poetryプロジェクト)のLambdaアプリケーションのデプロイ

├── .venv
├── function.jsonnet // lambroll設定ファイル
├── function.zip // Lambdaデプロイ時のarchive
├── lambda
│  └── main.py // Lambdaアプリ本体
├── Makefile // タスクランナ
├── poetry.lock
├── poetry.toml
├── pyproject.toml

Makefile(タスク)

## run:💻 Run main script locally
.PHONY: run
run:
	@poetry run main

## archive:📦 Archive AWS Lambda function as a zip
.PHONY: archive
archive:
	@rm -f ${LAMBDA_ZIP}
	cd .venv/lib/python3.10/site-packages/ \
		&& zip -r ../../../../${LAMBDA_ZIP} . -x "boto*"
	cd lambda \
		&& zip -g ../${LAMBDA_ZIP} main.py

## deploy:🚀 Deploy a function to AWS Lambda
.PHONY: deploy
deploy: archive
	lambroll \
		--tfstate ${TFSTATE_URL} \
			deploy \
				--src ${LAMBDA_ZIP}

## invoke:💨 Invoke deployed lambda function
.PHONY: invoke
invoke:
	@echo '{}' | lambroll \
		--tfstate ${TFSTATE_URL} \
			invoke \
				--log-tail

.PHONY: help
help: Makefile
	@echo
	@echo " Choose a command"
	@echo
	@sed -n 's/^##//p' $< | column -t -s ':' |  sed -e 's/^/ /'
	@echo
  • Lambdaのデプロイツールとしてはlambrollを使用
    • ecspresso然り、@fujiwaraさん作のデプロイツールによくお世話になる
    • インフラリソースは基本的にTerraform管理(後述)とするが、Lambdaは他のAWSリソースとは異なり、アプリケーションのライフサイクルでデプロイされるものなので、Terraform管理リソースとは棲み分ける選択をしている
  • make runでLambdaのローカル実行&デバッグ
  • make deployでlambrollによるLambdaデプロイ
  • make invokeでデプロイしたLambdaの実行

function.jsonnet(lambroll設定)

{
  Architectures: [
    'arm64',
  ],
  Description: '',
  EphemeralStorage: {
    Size: 512,
  },
  Environment: {
    Variables: {
      LOG_LEVEL: 'INFO',
      GOOGLE_CALENDAR_ID: '{{ env `GOOGLE_CALENDAR_ID` }}',
      SLACK_CHANNEL_ID: '{{ env `SLACK_CHANNEL_ID` }}',
      PARAM_KEY_SERVICE_ACCOUNT_JSON: '{{ env `PARAM_KEY_SERVICE_ACCOUNT_JSON` }}',
      PARAM_KEY_SLACK_API_TOKEN: '{{ env `PARAM_KEY_SLACK_API_TOKEN` }}',
    },
  },
  FunctionName: 'schedule-reminder',
  Handler: 'main.lambda_handler',
  MemorySize: 128,
  Role: '{{ tfstate `aws_iam_role.lambda_exec_iam_role.arn` }}',
  Runtime: 'python3.10',
  SnapStart: {
    ApplyOn: 'None',
  },
  Tags: {},
  Timeout: 15,
  TracingConfig: {
    Mode: 'PassThrough',
  },
}

シークレットをgit管理下に置く

  • {{ env }}表記で参照している環境変数は、.envファイルからautoload(direnv前提)されたもの
  • .envの値自体はsopsで暗号化されたものをcommitしてあり、環境セットアップ時に復号する
    • 以下のようなmakeターゲットを定義して、make bootstrapで復号された環境変数値が.envに出力される(故に.envは.gitignoreに含めるカタチ)
      SOPS_SECRETS := .enc.env
      
      ## bootstrap:🌱 Bootstrap project
      .PHONY: bootstrap
      bootstrap:
      	sops -d ${SOPS_SECRETS} > .env
      	poetry install
      
  • direnv前提と書いたが、.envrc自体はgit管理下に置くカタチ(sops使用にあたり、SOPS_KMS_ARN環境変数は定義済の必要がある為)
    export SOPS_KMS_ARN='arn:aws:kms:<region>:<account_id>:key/xxx'
    dotenv
    
  • 暗号化された環境変数は.env.envファイルでgit管理
    LOG_LEVEL=ENC[AES256_GCM,data:T8ZyCWQ=,iv:CF29x2Xzjz7nLBkmucXp5IC+ZmsPIFVKAeY5fn6OM1Q=,tag:ohr3SrlOc+xIkH3BYqD+iw==,type:str]
    GOOGLE_CALENDAR_ID=ENC[AES256_GCM,data:KGq9mzglqpvexMArY8DZWu/toztocqYxaoJp6GKEP5A7UUIXdnJW5vs2ZIo8W/wqrBS7pdv12KNgGKLGtPfF4ML/MvlfuTllQ/hbs9voZyOttxXT2bOL208fXzQ=,iv:zydKGRYBxaAqMjHDR7BWpSXzT+JkRSi0pwLNanxhWbE=,tag:dvW6Po3wBmDaam5Rlzja7g==,type:str]
    SLACK_CHANNEL_ID=ENC[AES256_GCM,data:a0mebuPDfa0GGL4=,iv:bLhxzIDR6gHvtQEWlKKhXNnd4R4az7kGxHUeozv/IuQ=,tag:YPs6stYFBKiRCSr7jSlkYQ==,type:str]
    PARAM_KEY_SERVICE_ACCOUNT_JSON=ENC[AES256_GCM,data:2IlXX+rtEQoZJVDzV2mZh4E1GrN3q7fqe2y4kRm5KidVLqacps37r6/+SvAvFC83OYBMOfWzLIc=,iv:3jnrCDu4Kpadyyl61QyyzczkcqRIPFOGQiobcLkDgT4=,tag:FG7udglo1xNW49URV7NNgw==,type:str]
    PARAM_KEY_SLACK_API_TOKEN=ENC[AES256_GCM,data:9nE5yoVh5nBm8jIv/9NbvQlGZC+PL+ghK4NrE/waMp/Y3mPI,iv:c0Y/k4oRvqsSd+BNmyfx1De05NepVeyHLWroCLGKnkg=,tag:l5LzPyKAwuu3cKqIgCFYTQ==,type:str]
    TFSTATE_URL=ENC[AES256_GCM,data:srip5oVHCCDKxauWnWN+NHbxevQlzAEHfjbigmW8qj0iB+a/KMswCui+pVizPcimD6S4SOoJn7zPERbXT0xiAcyvpc2Y21DEbKRy,iv:ORQIh040/HnTytC/ghITJ9HoRpLrq09XYqvWcSV2DO8=,tag:bNlNYsHT0cckEJtrEpgvHQ==,type:str]
    

Terraformでインフラリソース管理

  • Lambdaアプリケーションはlambrollでデプロイ管理を行い、
  • 他のAWSリソースはTerraformで行う(aws_lambda_functionはTerraform管理としない)
.
├── function.jsonnet
├── lambda
│  └── main.py
├── Makefile
├── poetry.lock
├── poetry.toml
├── pyproject.toml
└── terraform // 👈👈👈
   ├── backend.tf // s3でtfstate管理(cloudposse/terraform-aws-tfstate-backendモジュール利用)
   ├── eventbridge.tf // スケジューラを定義するEventBridge
   ├── iam_role.tf // Lambda用IAMロール
   ├── kms.tf // sops用のKMSキー
   ├── locals.tf
   ├── main.tf
   ├── Makefile
   ├── outputs.tf
   ├── README.md
   ├── secrets.yml // SSMパラメータストアに保存するシークレット(sopsで暗号化した上でgit管理)
   ├── sops.tf // carlpett/sopsモジュールを利用し、terraformコード内でシークレット値を参照
   ├── ssm_params.tf // SSMパラメータストア(APIキーなど)
   └── variables.tf

スケジューラ(EventBridge)定義

eventbridge.tf

resource "aws_scheduler_schedule_group" "lambda_eventbridge_scheduler_group" {
  name = "${local.service_id}-scheduler-group"
}

resource "aws_scheduler_schedule" "lambda_eventbridge_scheduler_schedule" {
  name                         = "${local.service_id}-lambda-scheduler-schedule"
  group_name                   = aws_scheduler_schedule_group.lambda_eventbridge_scheduler_group.name
  schedule_expression          = "cron(0 9 * * ? *)"
  schedule_expression_timezone = "Asia/Tokyo"
  flexible_time_window {
    mode = "OFF"
  }
  target {
    arn      = var.lambda_arn
    role_arn = aws_iam_role.lambda_eventbridge_scheduler_iam_role.arn
  }
}

resource "aws_iam_role" "lambda_eventbridge_scheduler_iam_role" {
  name               = "${local.service_id}-lambda-scheduler-role"
  assume_role_policy = <<POLICY
{
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Effect": "Allow",
      "Principal": {
        "Service": "scheduler.amazonaws.com"
      }
    }
  ],
  "Version": "2012-10-17"
}
POLICY

  managed_policy_arns  = [aws_iam_policy.lambda_eventbridge_scheduler_iam_policy.arn]
  max_session_duration = "3600"
  path                 = "/service-role/"
}

resource "aws_iam_policy" "lambda_eventbridge_scheduler_iam_policy" {
  name = "${local.service_id}-lambda-scheduler-policy"
  path = "/service-role/"

  policy = <<POLICY
{
  "Statement": [
    {
      "Action": "lambda:InvokeFunction",
      "Effect": "Allow",
      "Resource": "${var.lambda_arn}"
    }
  ],
  "Version": "2012-10-17"
}
POLICY
}

シークレットの管理

  • Lambdaデプロイに必要な環境変数の他、SSMパラメータストアに保存するシークレットもgit管理に置くため、sopsで暗号化したファイル(secrets.yml)を置いている
  • carlpett/sops Terraformモジュールにより、Terraformコードから復号したシークレット値を参照している

secrets.yml

service_credentials_json: ENC[AES256_GCM,data:MB6qKu7r7H7IMCw0dEvHST+...,type:str]
slack_api_token: ENC[AES256_GCM,data:hBNKNShTZyfX8OSPyZJSp3BRD9TBxia+...,,type:str]
sops:
    kms:
        - arn: arn:aws:kms:<region>:<account_id>:key/xxx
          created_at: "..."
          enc: ...
          aws_profile: ""

Terraformコード

terraform {
  required_providers {
    ...
    sops = {
      source  = "carlpett/sops"
      version = "~> 0.7"
    }
  }
}

# シークレットをsops_fileデータとして定義
data "sops_file" "secrets" {
  source_file = "secrets.yml"
}

resource "aws_ssm_parameter" "ssm_param_gcp_service_account_credential_json" {
  name  = "/${local.service_id}/GCP_SERVICE_ACCOUNT_CREDENTIAL_JSON"
  type  = "SecureString"
  # 復号シークレット値をsops_fileデータから参照
  value = data.sops_file.secrets.data["service_credentials_json"]
}

resource "aws_ssm_parameter" "ssm_param_slack_api_token" {
  name  = "/${local.service_id}/SLACK_API_TOKEN"
  type  = "SecureString"
  value = data.sops_file.secrets.data["slack_api_token"]
}

つくった結果

参考