はじめに
ジョブデータコアグループに所属している池田です。 ジョブデータコアグループでは、求人情報の取り込み、求人情報の管理、検索エンジンまでのインデックスを行っております。
我々のチームでは2020年11月からスタンバイのクローリングシステムをリアーキテクト・リプレイスしたのですが、 今回はその時の一部のプロダクトについて課題と実際に2年間運用してどうだったのか振り返りを書いていきます。
課題と背景
課題背景は過去のブログ「スタンバイの求人情報取込の仕組みを作り直した話 〜序章〜」でも記載しましたので詳細は割愛しますが 当時求人取り込みの現場で以下のような問題が発生しておりました。
求人取込から広告掲載まで最大6-7時間かかる 求人のフィールドを変更するのに1ヶ月かかった 求人情報の論理的な破損が発生後、復旧までに1ヶ月かかった
大きな問題として
「求人・広告データが1箇所で保管されていることで、データの柔軟性、拡張性に問題が発生している」
という技術的負債があったのでリアーキテクトを行っていったのですが、 その中でレガシーコード化が進んでいるものがありました。
それがes書き込みバッチ
というシステムです。
※1 レガシーコードとは (理由はなんであれ) 修正、拡張、作業が非常に難しいコードのことを指しています。 (「レガシーコードからの脱却」より)
es書き込みバッチについて
es書き込みバッチ
は クローリングした求人から、求人の情報を抽出し、正規化して、検索エンジンへインデクシングするという責務を担っております。
図のes書き込みバッチ
となっているシステムになります。
2015年から開発されて、Scala
とApache Spark
(オープンソースの分散処理システム)で作られておりました。
システムの処理の流れとしてクローリングデータ(文字列のjsonのデータ)を受け取り、以下のパイプライン処理を行います。
- クローリングデータ(jsonデータ)から求人のフィールドの抽出
- フォーマットチェック
- 求人情報の正規化、
- 検索エンジンへの書き込み
しかし処理の過程の型がOption[Map[String, Option[Any]]]
という型で
どのような変更が行われ、最終的にどの項目が書き込まれているのか、ということがコードからすぐには理解できない状態でした。
各パイプラインの返り値の型
// jsonから求人データの抽出した後の型 Map[String, Option[Any]] // フォーマットチェック後の型 Option[Map[String, Option[Any]]] // 求人情報の正規化後の型 Option[Map[String, Option[Any]]]
- Optionは値がその名の通り値がオプショナルである型で、値があれば値を返し、なければ値を返しません。
- Mapは、「キー」と対応する「値」の2つの要素をペアにして格納するデータ構造です。
- Scalaの世界のAny型はもっとも汎用的な型になり、どの型の値も入れることができます。
簡単な例ですが、Map[String, Option[Any]]
は以下のようなデータを入れることができます。
val sample = Map[String, Option[Any]]( "jobTitle" -> Option("バックエンドエンジニア"), "jobContent" -> Option("スタンバイのバックエンド開発業務"), "jobType" -> Option(Array[String]("正社員")), "siteName" -> Option("スタンバイ"), "salary" -> Option( Map( "displayString" -> None ) ) )
アウトプット先がElasticsearchなので書き込みはできるのですが(スキーマレスに書き込みができるため)、 求人の項目をMapで表現しているためコード上で求人の各項目の管理難しく、さらに検索エンジンで使われていない 項目のフィールドも存在しておりました。
一方でUnitテストがしっかり書かれていたので、Unitテストを通してコードを拡張し保守運用ができている状態でした。
またパイプライン処理が型で抽象化されており、全体的なシステムの構造として理解しやすい作りになっておりました。 おかげで5年経っても全体的なコードの見通しは良い状態でした。
/** * パイプライン処理で使用する1つの処理を表すトレイト。 * * @tparam IN 入力パラメータの型 * @tparam OUT 出力パラメータの型 */ trait Stage[IN, OUT] { /** * このステージでの処理をこのメソッドに実装します。 * * @param in 入力パラメータ * @return 出力パラメータ */ def process(in: IN): OUT /** * このステージの次に別のステージを連結してパイプラインを構成します。 * * @param nextStage 次のステージ * @tparam T 次のステージの出力パラメータの型 * @return パイプライン */ def |[T](nextStage: Stage[OUT, T]) /** * このステージの次に別のステージを連結してパイプラインを構成します。 * このメソッドで追加されたステージは | で連結されたステージとは異なり、 * 前のステージで例外が発生した場合でも必ず呼び出されます。 * * @param nextStage 次のステージ * @tparam T 次のステージの出力パラメータの型 * @return パイプライン */ def >[T](nextStage: Stage[Either[ExtractorThrowable, OUT], T]) }
どのようなプロセスで進めたのか?
以下の手順でシステムの作り直しを行いました。
- 既存のコードをチーム全員でコードリーディングし、何をやっているのか?何のためにあるのか?必要なのか?を議論してコード上での共通認識をもつ
- 求人データで使われている項目(必須、非必須)、使われていない項目の整理
- ドメインモデルを定義する
- パイプライン処理のフローで型を定義する
- 既存のドメインロジックで使えるコードを移植する
- Unitテストを書く
- 実際に検証環境で運用して想定してなかった型がきていないかなどの確認
各段階の詳細は省きますが、1の項目は既存のコードを熟知しているメンバーから機能の必要性を的確に判断してもらい そしてコード上の知見、課題がチーム全員に共有されたのでよかったなと思っています。
また、es書き込みバッチ
ではUnitテストがしっかり書かれていたのでコードの移植もスムーズに行えました。
メソッドのアウトプットに対して型定義する
リファクタリングで心に残っているものについて紹介します。
es書き込みバッチ
では、 JSON形式の設定情報によって、様々なデータの形式を抽出する汎用的なメソッドがありました。
protected def applyScript(property: ExtractProperty, rawdata: Map[String, String])(value: String): Option[Any]
上記のメソッドではInt型、String型、Seq型、給与を表す型、JavaのList型など様々な型のアウトプットを返し、その結果返り値がOption[Any]
の型で、どんな値でも取れるような作りになっておりました。
しかしアウトプットの型がブラックボックス化して、コードが追えなくなるため返り値に対して型定義を行いました。
// 結果を表すトレイト trait ScriptResult sealed abstract class MVELResult extends ScriptResult object MVELResult { case class StringResult(value: String) extends MVELResult case class IntResult(value: Int) extends MVELResult case class SeqResult(value: Seq[String]) extends MVELResult case class JListResult(value: java.util.List[String]) extends MVELResult case class ArrayResult(value: Array[String]) extends MVELResult case class SalaryResult(value: Option[Salary]) extends MVELResult } //ScriptResultはMVELResult以外の型も返すことがあるため、MVELResultとScriptResultでわけております。 //ScriptResultをミックスインした結果の型定義を行うことで、他の文脈の型もとることができます。
上記は、コードの一部になります。
リファクタリングしたメソッドのシグニチャ
private def extractProperties( extractDefinition: ExtractDefinition, crawlingJob: CrawlingJob, parser: Parser ): Map[PropertyName, Option[ScriptResult]]
これでメソッドの結果の型が明確になり、コードが追いやすくなります。
リプレイスを行った結果
今回のes書き込みバッチ
は2ヶ月ほどでリプレイスを行うことができました。(コードの変更が1ヶ月、インフラの変更からリリースまで1ヶ月ほどでした。)
またメソッドに限らず、ドメインモデルを定義することで、処置中のコードも以下のような型が定義されて、各パイプラインでのデータの変化がわかるようになりました。
ドメインモデルを定義
// クローリングデータの求人情報 case class CrawlingJob( documentId: DocumentId, crawledResult: CrawledResult // 省略 ) // ETLを行った後の求人のクラス case class ExtractedJob( jobId: JobId, jobTitle: JobTitle // 省略 )
リプレイス後の各パイプラインの返り値の型
// jsonから求人データの抽出した後の型 Either[Exception, CrawlingJob] // フォーマットチェック後の型 Either[ExtractStageException, Job]
型ない状態だと実際のデータにどういった型が入っているのかを常に考えながら実装する必要があり、非常に頭を使います。 一方で型としてコードに落とし込めると、型情報を見ながら進められるので考えることを減らすことができます。
他にもリプレイスを通して以下のことを行いました。
- これまで依存している外部ライブラリの都合でScalaのバージョンが2.11系だったので、リプレイスを機にメンテナンスされていない外部ライブラリを切り離しScalaバージョンを2.13にあげることができました。
- フォーマットチェックではValidatedを使って簡潔にエラーの蓄積を実装できました。
- これまでEC2上にSparkのクラスターを作り運用していましたが、AWS EMRを使いクラスター管理がマネージドになりました。
- EMRクラスターの構築、ログ収集基盤の作成などは難易度が高かったのですが、マネージドになったことで特に手を加えることなく、クラスターを動かしているマシンが入れ替わっているので運用自体は楽になりました。
- 求人情報の正規化処理を別モジュールに分けて、求人を抽出するアプリケーションとしての責務に変更しました。
振り返り
5年前のコードであってもバージョンのアップグレードを行ったり、ドメインモデルを再定義することで既存のUnitTestを再活用することができ、コードを再生できました。 またこのリプレイスから2年ほど運用しましたが、ドメインモデルや処理の過程を型で定義することで求人項目の追加、削除しやすくなりました。以後も追加の開発を行えております。
Masterのcommitの状況
2021年以降から活発に開発も行われて、リプレイス後に開発の頻度が上がっていることがわかります。
スキーマレスなデータストアに書き込む仕様だと、型情報がなくても実現できるのですが、時間の経過に伴いメンテナンスが難しくなります。 その結果コードがレガシー化することにつながりやすいです。
もちろんまだまだ技術的な負債は存在している状態ですが、今回ドメインモデルとして型を定義して、責務を分解し、Unitテストを書くことで 過去のシステムの資産を使いながらコードを拡張できる状態にできました。
継続的な開発を行っていくために改めて、型定義、責務の見直し、Unitテストが重要であるということを認識しました。
スタンバイのプロダクトや組織について詳しく知りたい方は、気軽にご相談ください。 www.wantedly.com