モバイルアプリ開発をしていて、REST APIのリクエスト/レスポンスのペアをログ保存しておいて、 あとでリクエスト内容やレスポンス内容別にサクッと検索したいことがあったので、 mitmproxyを介してサーバレスポンスをフックして、リクエスト/レスポンス情報をマシンlocalに立てたElasticsearch向けにdumpするといったaddonを書いた
どうやるの
- mitmproxyのHTTPイベントのEvent Hooksをつかう。
- requestheaders/request/responseheaders/responseなどのフックが用意されていて、今回はレスポンス値を取り出したいので、responseフックをつかう。
- responseフックは以下のようなシグネチャ
def response(self, mitmproxy.http.HTTPFlow):
HTTPFlow
は、以下のようなclassclass HTTPFlow(flow.Flow): """ An HTTPFlow is a collection of objects representing a single HTTP transaction. """ request: Request """The client's HTTP request.""" response: Optional[Response] = None """The server's HTTP response.""" error: Optional[flow.Error] = None """ A connection or protocol error affecting this flow. Note that it's possible for a Flow to have both a response and an error object. This might happen, for instance, when a response was received from the server, but there was an error sending it back to the client. """ websocket: Optional[WebSocketData] = None """ If this HTTP flow initiated a WebSocket connection, this attribute contains all associated WebSocket data. """
mitmproxy.http.Request
,mitmproxy.http.Response
フィールドがそれぞれ生えているので、これら2つをdictにシリアライズできればよさそう
- responseフックは以下のようなシグネチャ
- 結論からいうと、
mitmproxy.http.HTTPFlow
には、オブジェクトのdict化に相当するget_state
メソッドが生えているget_state
自体は、mitmproxy.http.HTTPFlow
のsuper classを辿った先に行き着くmitmproxy.coretypes.serializable.Serializable
抽象基底クラスに生えているメソッド
mitmproxy.coretypes.serializable.Serializable
を直接に実装するのがmitmproxy.stateobject.StateObject
で、以下のようなget_state
実装が施されているclass StateObject(serializable.Serializable): """ An object with serializable state. State attributes can either be serializable types(str, tuple, bool, ...) or StateObject instances themselves. """ _stateobject_attributes: typing.ClassVar[typing.MutableMapping[str, typing.Any]] """ An attribute-name -> class-or-type dict containing all attributes that should be serialized. If the attribute is a class, it must implement the Serializable protocol. """ def get_state(self): """ Retrieve object state. """ state = {} for attr, cls in self._stateobject_attributes.items(): val = getattr(self, attr) state[attr] = get_state(cls, val) return state
_stateobject_attributes
がsub classたるmitmproxy.flow.Flow
やmitmproxy.http.HTTPFlow
でoverrideされるカタチで定義されていてpolymorphicな実装になっているmitmproxy.flow.Flow
_stateobject_attributes = dict( id=str, error=Error, client_conn=connection.Client, server_conn=connection.Server, type=str, intercepted=bool, is_replay=str, marked=str, metadata=typing.Dict[str, typing.Any], comment=str, )
mitmproxy.http.HTTPFlow
_stateobject_attributes = flow.Flow._stateobject_attributes.copy() # mypy doesn't support update with kwargs _stateobject_attributes.update(dict( request=Request, response=Response, websocket=WebSocketData, mode=str ))
- というわけで、
response
hookメソッドの引数として渡されるflowオブジェクトのget_state
呼出で、- flowオブジェクトのもつattributesがいいかんじにdict化されるしくみがすでに存在している
- 一方で、flowオブジェクトをdict化できたところで、そのままJSONとしてPOST可能な構造にはなっていない部分がある
- たとえば、
mitmproxy.http.Headers
はリクエストヘッダの(key, value)のTupleの集合(Tuple)であったり、これらの値はそれぞれbytes型であったりmitmproxy.http.Response
のcontent
はbytes型であったり
- たとえば、
- これらをJSONとして扱いたい場合、配列や文字列に変換する必要がある
- 実は、ありがたいことに、これらの変換処理については、mitmproxy公式repositoryのexamples/contrib内に存在している
- これをベースに、flowに格納されているリクエスト/レスポンス情報を任意の構造にかんたんにカスタマイズできる
- 今回書いたaddon実装は以下のようなもの
import asyncio import gzip import json import traceback from threading import Thread from typing import Optional, Any, Dict, List, Union import aiohttp from mitmproxy import ctx JSON = Any elasticsearch_url = "http://127.0.0.1:9200/apptraffic/_doc" dump_target_hosts = [ "api.example.com", ] class ElasticAgentAddon: fields = { "timestamp": ( ("error", "timestamp"), ("request", "timestamp_start"), ("request", "timestamp_end"), ("response", "timestamp_start"), ("response", "timestamp_end"), ), "headers": ( ("request", "headers"), ("response", "headers"), ), "content": ( ("request", "content"), ("response", "content"), ), } def __init__(self, url: str, hosts: List[str]): self.url: str = elasticsearch_url self.hosts: List[str] = hosts self.content_encoding: Optional[str] = None self.transformations: Optional[List[Dict[str, Any]]] = None self.worker_pool: Optional[ElasticAgentWorkerPool] = None def configure(self, _): self._init_transformations() self.worker_pool = ElasticAgentWorkerPool(self.url) self.worker_pool.start() def response(self, flow): """ Dump request/response pairs. """ if flow.request.host not in self.hosts: return for k, v in flow.response.headers.items(): if k.lower() == "content-encoding": self.content_encoding = v break state = flow.get_state() del state["client_conn"] del state["server_conn"] for tfm in self.transformations: for field in tfm["fields"]: self.transform_field(state, field, tfm["func"]) frame = self.convert_to_strings(state) self.worker_pool.put(frame) def _init_transformations(self): def map_content( content: Optional[bytes], ) -> Union[Optional[bytes], Any]: if self.content_encoding: content = Decoding.decode(content, self.content_encoding) try: obj = json.loads(content) except json.decoder.JSONDecodeError: return content else: return obj self.transformations = [ { "fields": self.fields["headers"], "func": dict, }, { "fields": self.fields["timestamp"], "func": lambda t: int(t * 1000), }, {"fields": self.fields["content"], "func": map_content}, ] @staticmethod def transform_field(obj, path, func): """ Apply a transformation function `func` to a value under the specified `path` in the `obj` dictionary. """ for key in path[:-1]: if not (key in obj and obj[key]): return obj = obj[key] if path[-1] in obj and obj[path[-1]]: obj[path[-1]] = func(obj[path[-1]]) @classmethod def convert_to_strings(cls, obj): """ Recursively convert all list/dict elements of type `bytes` into strings. """ if isinstance(obj, dict): return { cls.convert_to_strings(key): cls.convert_to_strings(value) for key, value in obj.items() } elif isinstance(obj, list) or isinstance(obj, tuple): return [cls.convert_to_strings(element) for element in obj] elif isinstance(obj, bytes): return str(obj)[2:-1] return obj class Decoding: class __Methods: @staticmethod def identity(content: bytes) -> str: return str(content) @staticmethod def decode_gzip(content: bytes) -> str: if not content: return "" return str(gzip.decompress(content), "utf-8") decoding_maps = { "none": __Methods.identity, "gzip": __Methods.decode_gzip, } @classmethod def decode(cls, encoded: bytes, encoding: str) -> Optional[str]: if encoding not in cls.decoding_maps: return None return cls.decoding_maps[encoding](encoded) class ElasticAgentWorkerPool(Thread): def __init__(self, url: str, num_workers: int = 10): super().__init__(name="ElasticAgentWorkerPool", daemon=True) self.url = url self.loop: Optional[asyncio.AbstractEventLoop] = None self.queue: Optional[asyncio.Queue[JSON]] = None self.num_workers = num_workers def run(self): loop = asyncio.new_event_loop() self.loop = loop asyncio.set_event_loop(loop) try: loop.run_until_complete(self._run_loop()) except Exception as e: ctx.log.error(e) ctx.log.error(traceback.format_exc()) else: if not loop.is_closed(): loop.close() ctx.log.info("ElasticAgentWorkerPool's event loop closed") def put(self, frame: JSON): if self.loop and not self.loop.is_closed(): self.loop.call_soon_threadsafe(self.queue.put_nowait, frame) async def _run_loop(self): self.queue = asyncio.Queue() await asyncio.gather( *(self.post_worker(i) for i in range(self.num_workers)) ) async def post_worker(self, id: int): while True: ctx.log.info(f"worker[{id}]:waiting...") frame = await self.queue.get() ctx.log.info(f"worker[{id}]:got task") async with aiohttp.ClientSession() as session: async with session.post(self.url, json=frame) as resp: ctx.log.info(f"worker[{id}]:Done") ctx.log.info(await resp.text()) addons = [ElasticAgentAddon(url=elasticsearch_url, hosts=dump_target_hosts)]
- 既存のcontrib実装との違いとしては、
- client_connやserver_connといった、コネクション周りの詳細情報は不要だったので省いている(あくまでrequest/responseのみに関心があった為)
- response contentについては、JSON APIであることを前提としていたので、文字列としてではなく、JSON(dict型)に変換している
- ElasticsearchへのPOSTを行なう非同期I/Oをasyncioベースに(なんとなく)
- 既存のcontrib実装との違いとしては、
add-onを仕込む
- add-onスクリプトは外部依存を追加しているので、mitmproxyに属するpipでインストールしておく
# install additional dependencies $ $(brew --prefix mitmproxy)/libexec/bin/pip install aiohttp # or if you installed mitmproxy with pipx $ pipx inject mitmproxy aiohttp
~/.mitmproxy/config.yml
に、scriptsとして指定するscripts: - /path/to/mitmproxy_elasticagent.py
- mitmproxyなりmitmdumpなり起動して通信傍受を開始
$ mitmproxy # or $ mitmdump
- mitmproxyの場合、
E
キーでEvents view表示することで、ログがみれるEvents info: Loading script /path/to/mitmproxy_elasticagent.py info: worker[0]:waiting... info: worker[1]:waiting... info: worker[2]:waiting... info: worker[3]:waiting... info: worker[4]:waiting... info: worker[5]:waiting... info: worker[6]:waiting... info: worker[7]:waiting... info: worker[8]:waiting... info: worker[9]:waiting... ⇩ [0/0] [scripts:1] [*:8080]
- mitmproxyの場合、
どううれしいか
Kibanaも立ててると、
- リクエスト一覧したり
- 特定レスポンス値でフィルタかけたり
いろいろサルベージできる⚓