mitmproxyを介したAPI通信をElasticsearchにdumpする

Posted on | 788 words | ~4 mins

モバイルアプリ開発をしていて、REST APIのリクエスト/レスポンスのペアをログ保存しておいて、 あとでリクエスト内容やレスポンス内容別にサクッと検索したいことがあったので、 mitmproxyを介してサーバレスポンスをフックして、リクエスト/レスポンス情報をマシンlocalに立てたElasticsearch向けにdumpするといったaddonを書いた

どうやるの

  • mitmproxyのHTTPイベントのEvent Hooksをつかう。
  • requestheaders/request/responseheaders/responseなどのフックが用意されていて、今回はレスポンス値を取り出したいので、responseフックをつかう。
    • responseフックは以下のようなシグネチャ
      def response(self, mitmproxy.http.HTTPFlow):
      
      • HTTPFlowは、以下のようなclass
        class HTTPFlow(flow.Flow):
            """
            An HTTPFlow is a collection of objects representing a single HTTP
            transaction.
            """
            request: Request
            """The client's HTTP request."""
            response: Optional[Response] = None
            """The server's HTTP response."""
            error: Optional[flow.Error] = None
            """
            A connection or protocol error affecting this flow.
        
            Note that it's possible for a Flow to have both a response and an error
            object. This might happen, for instance, when a response was received
            from the server, but there was an error sending it back to the client.
            """
        
            websocket: Optional[WebSocketData] = None
            """
            If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data.
            """
        
        • mitmproxy.http.Request, mitmproxy.http.Responseフィールドがそれぞれ生えているので、これら2つをdictにシリアライズできればよさそう
  • 結論からいうと、mitmproxy.http.HTTPFlowには、オブジェクトのdict化に相当するget_stateメソッドが生えている
    • get_state自体は、mitmproxy.http.HTTPFlowのsuper classを辿った先に行き着くmitmproxy.coretypes.serializable.Serializable抽象基底クラスに生えているメソッド
  • mitmproxy.coretypes.serializable.Serializableを直接に実装するのがmitmproxy.stateobject.StateObjectで、以下のようなget_state実装が施されている
    class StateObject(serializable.Serializable):
        """
        An object with serializable state.
    
        State attributes can either be serializable types(str, tuple, bool, ...)
        or StateObject instances themselves.
        """
    
        _stateobject_attributes: typing.ClassVar[typing.MutableMapping[str, typing.Any]]
        """
        An attribute-name -> class-or-type dict containing all attributes that
        should be serialized. If the attribute is a class, it must implement the
        Serializable protocol.
        """
    
        def get_state(self):
            """
            Retrieve object state.
            """
            state = {}
            for attr, cls in self._stateobject_attributes.items():
                val = getattr(self, attr)
                state[attr] = get_state(cls, val)
            return state
    
    • _stateobject_attributesがsub classたるmitmproxy.flow.Flowmitmproxy.http.HTTPFlowでoverrideされるカタチで定義されていてpolymorphicな実装になっている
      • mitmproxy.flow.Flow
        _stateobject_attributes = dict(
            id=str,
            error=Error,
            client_conn=connection.Client,
            server_conn=connection.Server,
            type=str,
            intercepted=bool,
            is_replay=str,
            marked=str,
            metadata=typing.Dict[str, typing.Any],
            comment=str,
        )
        
      • mitmproxy.http.HTTPFlow
        _stateobject_attributes = flow.Flow._stateobject_attributes.copy()
        # mypy doesn't support update with kwargs
        _stateobject_attributes.update(dict(
            request=Request,
            response=Response,
            websocket=WebSocketData,
            mode=str
        ))
        
    • というわけで、response hookメソッドの引数として渡されるflowオブジェクトのget_state呼出で、
      • flowオブジェクトのもつattributesがいいかんじにdict化されるしくみがすでに存在している
  • 一方で、flowオブジェクトをdict化できたところで、そのままJSONとしてPOST可能な構造にはなっていない部分がある
    • たとえば、
      • mitmproxy.http.Headersはリクエストヘッダの(key, value)のTupleの集合(Tuple)であったり、これらの値はそれぞれbytes型であったり
      • mitmproxy.http.Responsecontentはbytes型であったり
  • これらをJSONとして扱いたい場合、配列や文字列に変換する必要がある
  • 実は、ありがたいことに、これらの変換処理については、mitmproxy公式repositoryのexamples/contrib内に存在している
  • これをベースに、flowに格納されているリクエスト/レスポンス情報を任意の構造にかんたんにカスタマイズできる
  • 今回書いたaddon実装は以下のようなもの
    import asyncio
    import gzip
    import json
    import traceback
    from threading import Thread
    from typing import Optional, Any, Dict, List, Union
    
    import aiohttp
    from mitmproxy import ctx
    
    JSON = Any
    
    elasticsearch_url = "http://127.0.0.1:9200/apptraffic/_doc"
    dump_target_hosts = [
        "api.example.com",
    ]
    
    
    class ElasticAgentAddon:
        fields = {
            "timestamp": (
                ("error", "timestamp"),
                ("request", "timestamp_start"),
                ("request", "timestamp_end"),
                ("response", "timestamp_start"),
                ("response", "timestamp_end"),
            ),
            "headers": (
                ("request", "headers"),
                ("response", "headers"),
            ),
            "content": (
                ("request", "content"),
                ("response", "content"),
            ),
        }
    
        def __init__(self, url: str, hosts: List[str]):
            self.url: str = elasticsearch_url
            self.hosts: List[str] = hosts
            self.content_encoding: Optional[str] = None
            self.transformations: Optional[List[Dict[str, Any]]] = None
            self.worker_pool: Optional[ElasticAgentWorkerPool] = None
    
        def configure(self, _):
            self._init_transformations()
            self.worker_pool = ElasticAgentWorkerPool(self.url)
            self.worker_pool.start()
    
        def response(self, flow):
            """
            Dump request/response pairs.
            """
            if flow.request.host not in self.hosts:
                return
            for k, v in flow.response.headers.items():
                if k.lower() == "content-encoding":
                    self.content_encoding = v
                    break
            state = flow.get_state()
            del state["client_conn"]
            del state["server_conn"]
            for tfm in self.transformations:
                for field in tfm["fields"]:
                    self.transform_field(state, field, tfm["func"])
            frame = self.convert_to_strings(state)
            self.worker_pool.put(frame)
    
        def _init_transformations(self):
            def map_content(
                content: Optional[bytes],
            ) -> Union[Optional[bytes], Any]:
                if self.content_encoding:
                    content = Decoding.decode(content, self.content_encoding)
                try:
                    obj = json.loads(content)
                except json.decoder.JSONDecodeError:
                    return content
                else:
                    return obj
    
            self.transformations = [
                {
                    "fields": self.fields["headers"],
                    "func": dict,
                },
                {
                    "fields": self.fields["timestamp"],
                    "func": lambda t: int(t * 1000),
                },
                {"fields": self.fields["content"], "func": map_content},
            ]
    
        @staticmethod
        def transform_field(obj, path, func):
            """
            Apply a transformation function `func` to a value
            under the specified `path` in the `obj` dictionary.
            """
            for key in path[:-1]:
                if not (key in obj and obj[key]):
                    return
                obj = obj[key]
            if path[-1] in obj and obj[path[-1]]:
                obj[path[-1]] = func(obj[path[-1]])
    
        @classmethod
        def convert_to_strings(cls, obj):
            """
            Recursively convert all list/dict elements of type `bytes`
            into strings.
            """
            if isinstance(obj, dict):
                return {
                    cls.convert_to_strings(key): cls.convert_to_strings(value)
                    for key, value in obj.items()
                }
            elif isinstance(obj, list) or isinstance(obj, tuple):
                return [cls.convert_to_strings(element) for element in obj]
            elif isinstance(obj, bytes):
                return str(obj)[2:-1]
            return obj
    
    
    class Decoding:
        class __Methods:
            @staticmethod
            def identity(content: bytes) -> str:
                return str(content)
    
            @staticmethod
            def decode_gzip(content: bytes) -> str:
                if not content:
                    return ""
                return str(gzip.decompress(content), "utf-8")
    
        decoding_maps = {
            "none": __Methods.identity,
            "gzip": __Methods.decode_gzip,
        }
    
        @classmethod
        def decode(cls, encoded: bytes, encoding: str) -> Optional[str]:
            if encoding not in cls.decoding_maps:
                return None
            return cls.decoding_maps[encoding](encoded)
    
    
    class ElasticAgentWorkerPool(Thread):
        def __init__(self, url: str, num_workers: int = 10):
            super().__init__(name="ElasticAgentWorkerPool", daemon=True)
            self.url = url
            self.loop: Optional[asyncio.AbstractEventLoop] = None
            self.queue: Optional[asyncio.Queue[JSON]] = None
            self.num_workers = num_workers
    
        def run(self):
            loop = asyncio.new_event_loop()
            self.loop = loop
            asyncio.set_event_loop(loop)
            try:
                loop.run_until_complete(self._run_loop())
            except Exception as e:
                ctx.log.error(e)
                ctx.log.error(traceback.format_exc())
            else:
                if not loop.is_closed():
                    loop.close()
                ctx.log.info("ElasticAgentWorkerPool's event loop closed")
    
        def put(self, frame: JSON):
            if self.loop and not self.loop.is_closed():
                self.loop.call_soon_threadsafe(self.queue.put_nowait, frame)
    
        async def _run_loop(self):
            self.queue = asyncio.Queue()
            await asyncio.gather(
                *(self.post_worker(i) for i in range(self.num_workers))
            )
    
        async def post_worker(self, id: int):
            while True:
                ctx.log.info(f"worker[{id}]:waiting...")
                frame = await self.queue.get()
                ctx.log.info(f"worker[{id}]:got task")
                async with aiohttp.ClientSession() as session:
                    async with session.post(self.url, json=frame) as resp:
                        ctx.log.info(f"worker[{id}]:Done")
                        ctx.log.info(await resp.text())
    
    
    addons = [ElasticAgentAddon(url=elasticsearch_url, hosts=dump_target_hosts)]
    
    • 既存のcontrib実装との違いとしては、
      • client_connやserver_connといった、コネクション周りの詳細情報は不要だったので省いている(あくまでrequest/responseのみに関心があった為)
      • response contentについては、JSON APIであることを前提としていたので、文字列としてではなく、JSON(dict型)に変換している
      • ElasticsearchへのPOSTを行なう非同期I/Oをasyncioベースに(なんとなく)

add-onを仕込む

  • add-onスクリプトは外部依存を追加しているので、mitmproxyに属するpipでインストールしておく
    # install additional dependencies
    $ $(brew --prefix mitmproxy)/libexec/bin/pip install aiohttp
    
    # or if you installed mitmproxy with pipx
    $ pipx inject mitmproxy aiohttp
    
  • ~/.mitmproxy/config.ymlに、scriptsとして指定する
    scripts:
      - /path/to/mitmproxy_elasticagent.py
    
  • mitmproxyなりmitmdumpなり起動して通信傍受を開始
    $ mitmproxy
    # or
    $ mitmdump
    
    • mitmproxyの場合、EキーでEvents view表示することで、ログがみれる
      Events
      info: Loading script /path/to/mitmproxy_elasticagent.py
      info: worker[0]:waiting...
      info: worker[1]:waiting...
      info: worker[2]:waiting...
      info: worker[3]:waiting...
      info: worker[4]:waiting...
      info: worker[5]:waiting...
      info: worker[6]:waiting...
      info: worker[7]:waiting...
      info: worker[8]:waiting...
      info: worker[9]:waiting...
      
      
      
      
      ⇩  [0/0]   [scripts:1]                                                                                            [*:8080]
      

どううれしいか

Kibanaも立ててると、

  • リクエスト一覧したり
  • 特定レスポンス値でフィルタかけたり

いろいろサルベージできる⚓