RareJob Tech Blog

レアジョブのエンジニア・デザイナーによる技術ブログです

分析チームの開発スタイルについて

データサイエンティストの山本(@hayata_yamamoto)です。

レアジョブでは、EdTech Labという研究開発の部署で、主にスピーキングテストの自動化プロジェクトに関わっています。弊社全体としては、PHP, Go, TypeScriptなどがメイン言語ですが、私たちのチームはPythonで開発をしています。

今回は、Pythonを用いた分析チームの開発スタイルの話をします。

www.rarejob.co.jp

分析チームでの開発スタイル

分析チームでは、開発で以下のような工夫をしています。なお、コマンドラインから実行できるものについてはMakefileにコマンドを記載し、開発者がコマンドを覚えなくても良い仕様にしています。

  • 特徴量エンジニアリングとトレーニングの切り分け、最適化処理の簡略化
  • jupyterlab_templatesでのNotebookスタイルの統一
  • コア部分の単体テスト
  • mypyでの型チェック
  • flake8でのLintと、autoflake, black, isortを用いたコード整形

このスタイルに至った背景

私たちのプロジェクトは、同じデータセットに対してそれぞれ特性の違う機械学習モデルを用意する必要があるため、チームを編成し機械学習のプロジェクトを行っていく必要がありました。Python機械学習エコシステムをうまく活用しつつ、メンバーとの協業がしやすい環境を作るために、いわゆる設計思想に当たるものが必要になりました。

そこで私たちのチームでは、以下のように開発をしていくことにしています。

* 汎用的なコードを統一し共有して、チームメンバー全員がアクセスできるようにします。
* コードは明示的に型を宣言し、インターフェースを統一することで可用性を高めます。

というのも、機械学習や分析関連のコードは往々にして難しいロジックや高度な背景知識が必要になるため、自分以外の人との共有や関数の再利用までのハードルが高いという問題点を抱えていました。また、使えたとしても自分が書いていない関数を利用する際に、「意図しないエラー」や「関数はエラーなしで返答するのに、思った値と違う」といった問題は著しく開発のスピードを落とすので避ける必要がありました。

故に、なるべく開発者の手間を増やさず、かつ開発者同士の知見の共有とコードの再利用可能性を高めるために諸々の工夫をしています。

どのようにやっているか

特徴量エンジニアリングとトレーニングの切り分け、最適化処理の簡略化

弊チームの開発では、「特徴量エンジニアリング」と「トレーニング」でノートブックを分けています。(厳密には、「共通化部分の実装」もあるのですが、プロジェクトによって入れたり入れなかったりするので今回は省略します)

これは、「特徴量を共通化する」「精度の高いモデルを作る」のどちらも大切にしていきたいという思いを反映しています。それぞれのフェイズで求めるもの、求めないものを明示的に決定し、PRの際には何を見て、どのようなことをチェックするかを言語化しています。

f:id:hayata-yamamoto:20190627193249j:plain
Feature Engineering

f:id:hayata-yamamoto:20190627193326j:plain
Training

scikit-learnベースのモデルにOptunaを用いたパラメータ最適化も行なっています。

Optunaは、機械学習のハイパーパラメータ最適化を自動で行うライブラリです。開発元は、Preferred Networks社で、並列分散最適化や脈のないパラメータの枝切りなどをよしなに行なってくれる素晴らしいライブラリです。もしかすると、Kaggleなどで目にした人もいるかもしれません。裏側では、Tree-structured Parzen Estimator というベイズ最適化アルゴリズムを用いて最適化を行なっているようです。*1

よく使うモデルについてはあらかじめOptunaをラップしたクラスを用意して、特徴量のDataFrameと最適化したい指標(正答率など)を指定したら勝手にやってくれるようにしています。もちろん、自分たちがTensorFlowやPyTorchなどを用いて機械学習モデルを構成する場合は、別途最適化処理を書いたりします。

from copy import deepcopy
from typing import Callable, Dict, Iterator, List, Optional, Union

import numpy as np
import optuna
import pandas as pd
import sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from tqdm import tqdm


class ParameterOptimizer:
    def __init__(
        self,
        x: pd.DataFrame,
        y: pd.DataFrame,
        pipeline: Optional[Pipeline] = None,
        cv: int = 5,
        scoring: str = "accuracy",
        verbose: bool = False,
    ) -> None:
        """
        Args:
            x: dependent variables. this shape is (n_samples, n_features)
            y: predictor variables. this shape is (n_samples, )
            pipeline:
            cv:
            scoring:
        """
        self.x = x
        self.y = y
        self.cv = cv
        self.scoring = scoring

        if verbose:
            optuna.logging.enable_default_handler()
        else:
            optuna.logging.disable_default_handler()

        if pipeline is None:
            self.pipeline = pipeline
        else:
            assert isinstance(pipeline, Pipeline), "pipeline must be sklearn.pipeline.Pipeline instance"
            self.pipeline = pipeline

    def _add_model(self, model: sklearn.base.BaseEstimator) -> Pipeline:
        """
        add model to sklearn pipeline as a last step
        Args:
            model:
        Returns:
        """
        if self.pipeline is None:
            return Pipeline([("model", model)])

        pipeline = deepcopy(self.pipeline)
        pipeline.steps.append(("model", model))
        return pipeline

    def _optimize(
        self, jobs: Optional[Dict[str, Callable]] = None, n_trials: int = 10, n_jobs: int = 1
    ) -> Iterator[Dict[str, Union[str, float]]]:
        """
        Args:
            jobs:
            n_trials:
            n_jobs:
        Returns:
        """
        if jobs is None:
            jobs = {
                "LogisticRegression": self.optimize_logistic_regression,
                "SVC": self.optimize_svc,
                "DecisionTree": self.optimize_decision_tree,
                "RandomForest": self.optimize_random_forest,
            }

        for model, job in tqdm(jobs.items()):
            study = optuna.create_study()
            study.optimize(job, n_jobs=n_jobs, n_trials=n_trials)
            result = {"model": model, "best_score": 1 - study.best_value, "best_params": study.best_params}
            yield result

    def optimize(
        self, jobs: Optional[Dict[str, Callable]] = None, n_trials: int = 10, n_jobs: int = 1
    ) -> List[Dict[str, Union[str, float]]]:
        """
        Args:
            jobs:
            n_trials:
            n_jobs:
        Returns:
        """
        return list(self._optimize(jobs=jobs, n_jobs=n_jobs, n_trials=n_trials))

    def optimize_logistic_regression(self, trial: optuna.Trial) -> float:

        params = {
            "C": trial.suggest_uniform("C", 0.1, 10),
            "fit_intercept": trial.suggest_categorical("fit_intercept", [True, False]),
            "intercept_scaling": trial.suggest_uniform("intercept_scaling", 0.1, 2),
            "solver": trial.suggest_categorical("solver", ["newton-cg", "lbfgs", "liblinear", "saga"]),
            "max_iter": trial.suggest_int("max_iter", 100, 1000),
            "multi_class": trial.suggest_categorical("multi_class", ["auto"]),
        }

        clf = self._add_model(LogisticRegression(**params))
        score = cross_val_score(clf, self.x, self.y, cv=self.cv, scoring=self.scoring, error_score=np.nan)
        return 1 - score.mean()

    def optimize_svc(self, trial: optuna.Trial) -> float:

        params = {
            "C": trial.suggest_uniform("C", 0.1, 10),
            "kernel": trial.suggest_categorical("kernel", ["linear", "rbf", "poly", "sigmoid"]),
            "gamma": trial.suggest_uniform("gamma", 0.1, 2),
            "coef0": trial.suggest_int("coef0", 0, 10),
            "shrinking": trial.suggest_categorical("shrinking", [True, False]),
        }

        clf = self._add_model(SVC(**params))
        score = cross_val_score(clf, self.x, self.y, cv=self.cv, scoring=self.scoring, error_score=np.nan)
        return 1 - score.mean()

    def optimize_decision_tree(self, trial: optuna.Trial) -> float:

        params = {
            "criterion": trial.suggest_categorical("criterion", ["gini", "entropy"]),
            "splitter": trial.suggest_categorical("splitter", ["random", "best"]),
            "max_depth": trial.suggest_int("max_depth", 2, 50),
        }

        clf = self._add_model(DecisionTreeClassifier(**params))
        score = cross_val_score(clf, self.x, self.y, cv=self.cv, scoring=self.scoring, error_score=np.nan)
        return 1 - score.mean()

    def optimize_random_forest(self, trial: optuna.Trial) -> float:

        params = {
            "criterion": trial.suggest_categorical("criterion", ["gini", "entropy"]),
            "max_depth": trial.suggest_int("max_depth", 2, 100),
        }

        clf = self._add_model(RandomForestClassifier(**params))
        score = cross_val_score(clf, self.x, self.y, cv=self.cv, scoring=self.scoring, error_score=np.nan)
        return 1 - score.mean()

jupyterlab_templatesを利用したNotebookスタイルの統一

Jupyter Notebookは便利ですが、うまく利用しないと再現性やドキュメントのスタイルが属人的になってしまう問題があります。例えば、ある実験で得られた知見をどのようにまとめるかはかなり人に依存してしまいます。私たちのチームでは、Pull Requestをベースにした開発をするため、開発者それぞれが自分好みのノートブックを作成してしまうとレビュワーに負担がかかってしまいます。また、レビュワーがOKを出したら、社内向けに公開もしているため、統一したフォーマットが必要でした。

juptyer_templatesを用いると、あるノートブックをテンプレートとして、そのノートブックをベースにして新しいノートブックを作成できるため、便利です。

現在利用しているテンプレートは2種類運用しています。

  • アドホックな分析用
    • 実験の結果やまとめを書くドキュメント部分のみ共通化
  • モデルトレーニング用
    • 実験に使用した特徴量やパイプライン、モデル構成などを記述するドキュメント部分
    • optunaをベースにしたパラメータ最適化部分の共通化

インストールと利用開始はとても簡単です。コマンドラインから、

$ pip install jupyterlab_templates && \
jupyter labextension install jupyterlab_templates && \
jupyter serverextension enable --py jupyterlab_templates

として、jupyterlab_templatesをインストールし、jupyterlabに登録します。その後、~/.jupyter/jupyter_notebook_config.pyに以下を書き込めばOKです。

c.JupyterLabTemplates.template_dirs = ["ディレクトリのパス"]
c.JupyterLabTemplates.include_default = False # サンプルのテンプレートを使いたければTrue

コア部分の単体テスト

今は、pytestをメインに利用しています。unittestでも十分でしょう。pytestには、pytest-codestyleというライブラリがあり、初期は簡易的なlintとしても利用していました。

例えば、word_countを計算する場合は以下のような実装になります。

def test_word_count():
    df = pd.DataFrame(
        {
            "words": [
                [["hello", "world"]],
                [["Automate", "the", "Boring", "Stuff", "with", "Python"]],
                [["Buffalo", "buffalo", "Buffalo", "buffalo", "buffalo", "buffalo", "Buffalo", "buffalo"]],
            ]
        }
    )
    ret = word_count(df) # word countを計算する実装
    assert ret.equals(pd.Series([2, 6, 8]))

多くの人が必ず使うであろう関数については、テストを書くようにしています。まだ、明確にどのタイミングでテストを書くかについては言語化できていない部分があり、試行錯誤の段階です。

mypyでの型チェック

私たちのチームでは、関数やクラスの引数や返り値の型を明示的に宣言して、他の人が実装したコードを再利用しやすくする取り組みをしています。例えば、単語数を数える関数を実装するなら、

def word_count(word): # 避ける
    return len(word.split()) 

def word_count(word: str) -> int: # こちらで書く
    return len(word.split())

といった具合に、インプットとアウトプットの型を書くようにしています。これを行うことで、「実装の内容全体を理解するのは難しいが、利用はしたい」というケースに対応できます。実際、信号処理や自然言語処理、統計モデルや機械学習を普段扱うために、それぞれの実装者が全ての知識を得るのは大変です。難しいところは詳しい人に任せて、出来上がった実装は皆が利用できるようにしています。

しかし、この型付けを努力目標にとどめてしまうと、いづれ使われなくなって技術的な負債が増えることが想像されます。その対策として、型をチェックするツールとしてmypyを採用し、Pull Requestを出すたびに必ず型が適切に書かれているかをチェックしています。

CIが落ちているとコードレビューを行わないフローにすることで、全てのコードに型が書かれている環境を維持しています。mypyは、python用の静的型付けツールで、関数やクラスの入力の型とその返り値が適切に書かれているかをチェックしてくれます。dropboxを中心に開発が行われています。

その時に使用されているコマンドは以下です。それぞれのコマンドについては、こちらに詳しい記載があります。

$ mypy --allow-redefinition --ignore-missing-imports --disallow-untyped-defs --warn-redundant-casts --show-error-context --no-incremental --no-implicit-optional --html-report ./report project_dir/

flake8でのLintと、autoflake, black, isortを用いたコード整形

Pythonのコーディング規約であるPEP8に私たちのコードが準拠できているかを確認するために、flake8を用いています。 flake8とは、以下のようなコマンドを実行すると、プロジェクト全体のコードを走査して、PEP8に従っていないコードを、従っていないとされる規約とともに教えてくれるツールです。

$ flake8 project_dir/

39      E111 indentation is not a multiple of four
1       E128 continuation line under-indented for visual indent
1       E302 expected 2 blank lines, found 1
10      F821 undefined name 'cmp'
1       H306  imports not in alphabetical order (urlparse, urllib)

実際の利用ではいくつか設定をsetup.cfgに記述して以下の仕様で利用しています。一行あたりの最大文字数の制限と、PEP8の任意の項目については無視しています。

[flake8]
max-line-length=119
ignore=E121,E123,E126,E133,E226,E241,E242,E704,W503,W504,W505, E127,E266,E402,W605,W391,E701,E731

上記をCI上で行うことで、PEP8に従っていないコードが生まれないようにしています。

ただ、自分のコードがPEP8に準拠しているかどうかを常に気にしながらコードを書くのは正直苦痛です。なので開発者の負担を減らすための工夫もしています。

具体的には、いくつかのライブラリを組み合わせ、自分のコードが勝手にPEP8準拠にフォーマットされるようにしています。

$ autoflake --in-place --remove-all-unused-imports --remove-unused-variables --recursive project_dir/ &&\
 isort -rc project_dir/ &&\
 black --line-length 119 project_dir/

使用しているのは、autoflake, isort, blackの3つです。直感的には、「flake8に従わせる」「import部分を整理する」「コードスタイルを綺麗にする」といった感じです。もちろん、それぞれを単体で利用しても十分コードは綺麗になります。この組み合わせにしているのは、ライブラリごとにフォーマットの仕方に癖があるからです。

それぞれの開発者は、自由にコードを書いて

$ make format 

とするだけです。

おわりに

分析チームがもっと楽して、楽しく面白い分析できるようにしていきたいなと思ってやみません。