開源 AI 食譜文件

從 MLflow 到 Ray Serve 的簽名感知模型服務

Hugging Face's logo
加入 Hugging Face 社群

並獲得增強的文件體驗

開始使用

Open In Colab

從 MLflow 到 Ray Serve 的簽名感知模型服務

作者: Jonathan Jin

簡介

本筆記本探討了簡化模型從模型登錄檔部署的解決方案。對於希望隨著時間推移生產化許多模型的團隊來說,在 AI/ML 專案生命週期中這個“轉換點”上的投資可以顯著縮短上市時間。這對於可能沒有現有基礎設施來形成線上模型在生產中提供“黃金路徑”的年輕小型團隊來說可能很重要。

動機

最佳化模型生命週期的這一階段特別重要,因為最終結果面向生產。在此階段,您的模型實際上成為一個微服務。這意味著您現在需要處理服務所有權的各個要素,這可能包括:

  • 標準化和強制 API 向後相容性;
  • 日誌記錄、指標和一般可觀測性問題;
  • 等等。

每次您想部署新模型時都需要重複相同的通用設定,這將導致您和您的團隊的開發成本隨著時間的推移顯著增加。另一方面,考慮到生產模型所有權的“長尾”(假設生產化的模型不太可能很快退役),在這裡精簡投資可以隨著時間的推移帶來豐厚的回報。

鑑於以上所有內容,我們在這裡的探索動機是以下使用者故事:

我想使用只提供模型名稱的方式從模型登錄檔(例如 MLflow)部署模型。每次我想部署新模型時,需要複製的樣板和腳手架越少越好。我希望能夠動態地選擇不同版本的模型,而無需為適應這些新版本設定全新的部署。

元件

為此處的探索,我們將使用以下最小堆疊:

  • MLflow 用於模型註冊;
  • Ray Serve 用於模型服務。

出於演示目的,我們將只使用 Hugging Face Hub 中的現成開源模型。

我們今天將使用 GPU 進行推理,因為推理效能與我們今天的重點無關。毋庸置疑,在“實際生活中”,您可能無法僅憑 CPU 計算來提供模型服務。

現在讓我們安裝依賴項。

!pip install "transformers" "mlflow-skinny" "ray[serve]" "torch"

註冊模型

首先,讓我們定義今天探索要使用的模型。為簡單起見,我們將使用一個簡單的文字翻譯模型,其中源語言和目標語言在註冊時是可配置的。實際上,這意味著不同“版本”的模型可以註冊以翻譯不同的語言,但底層模型架構和權重可以保持不變。

import mlflow
from transformers import pipeline


class MyTranslationModel(mlflow.pyfunc.PythonModel):
    def load_context(self, context):
        self.lang_from = context.model_config.get("lang_from", "en")
        self.lang_to = context.model_config.get("lang_to", "de")

        self.input_label: str = context.model_config.get("input_label", "prompt")

        self.model_ref: str = context.model_config.get("hfhub_name", "google-t5/t5-base")

        self.pipeline = pipeline(
            f"translation_{self.lang_from}_to_{self.lang_to}",
            self.model_ref,
        )

    def predict(self, context, model_input, params=None):
        prompt = model_input[self.input_label].tolist()

        return self.pipeline(prompt)

(您可能想知道為什麼我們甚至費心使輸入標籤可配置。這將在稍後對我們有用。)

現在模型已經定義,讓我們註冊它的一個實際版本。這個特定版本將使用 Google 的 T5 Base 模型,並配置為從英語翻譯到德語

import pandas as pd

with mlflow.start_run():
    model_info = mlflow.pyfunc.log_model(
        "translation_model",
        registered_model_name="translation_model",
        python_model=MyTranslationModel(),
        pip_requirements=["transformers"],
        input_example=pd.DataFrame(
            {
                "prompt": ["Hello my name is Jonathan."],
            }
        ),
        model_config={
            "hfhub_name": "google-t5/t5-base",
            "lang_from": "en",
            "lang_to": "de",
        },
    )

讓我們記錄這個確切的版本。這將在稍後有用。

en_to_de_version: str = str(model_info.registered_model_version)

註冊的模型元資料包含一些對我們有用的資訊。最值得注意的是,註冊的模型版本與一個嚴格的簽名相關聯,該簽名表示其輸入和輸出的預期形狀。這將在稍後對我們有用。

>>> print(model_info.signature)
inputs: 
  ['prompt': string (required)]
outputs: 
  ['translation_text': string (required)]
params: 
  None

提供模型服務

現在模型已在 MLflow 中註冊,讓我們使用 Ray Serve 設定我們的服務腳手架。目前,我們將“部署”限制為以下行為:

  • 從 MLflow 獲取選定的模型和版本;
  • 透過簡單的 REST API 接收推理請求並返回推理響應。
import mlflow
import pandas as pd

from ray import serve
from fastapi import FastAPI

app = FastAPI()


@serve.deployment
@serve.ingress(app)
class ModelDeployment:
    def __init__(self, model_name: str = "translation_model", default_version: str = "1"):
        self.model_name = model_name
        self.default_version = default_version

        self.model = mlflow.pyfunc.load_model(f"models:/{self.model_name}/{self.default_version}")

    @app.post("/serve")
    async def serve(self, input_string: str):
        return self.model.predict(pd.DataFrame({"prompt": [input_string]}))


deployment = ModelDeployment.bind(default_version=en_to_de_version)

您可能已經注意到,這裡將 "prompt" 硬編碼為輸入標籤,這在註冊模型的簽名和部署實現之間引入了隱藏的耦合。我們稍後會再討論這個問題。

現在,讓我們執行部署並試用它。

serve.run(deployment, blocking=False)
>>> import requests

>>> response = requests.post(
...     "http://127.0.0.1:8000/serve/",
...     params={"input_string": "The weather is lovely today"},
... )

>>> print(response.json())
[{'translation_text': 'Das Wetter ist heute nett.'}]

這很好用,但您可能已經注意到 REST API 與模型簽名不一致。也就是說,它使用標籤 "input_string",而服務模型版本本身使用輸入標籤 "prompt"。同樣,模型可以接受多個輸入值,但 API 只接受一個。

如果這讓您感到不適,請繼續閱讀;我們稍後會再討論這個問題。

多個版本,一個端點

現在我們已經為模型建立了一個基本的端點。太棒了!但是,請注意,此部署嚴格繫結到此模型的單個版本——特別是已註冊的 translation_model 的版本 1

現在想象一下,您的團隊希望回來改進此模型——也許用新資料重新訓練它,或者將其配置為翻譯成新語言,例如法語而不是德語。這兩種情況都會導致 translation_model 的新版本註冊。但是,使用我們當前的部署實現,我們需要為 translation_model/2 設定一個全新的端點,要求我們的使用者記住哪個地址和埠對應哪個版本的模型,等等。換句話說:非常麻煩,非常容易出錯,非常 費力

相反,想象一種情況,我們可以重複使用完全相同的端點——相同的簽名、相同的地址和埠、相同的查詢約定等——來提供這兩個版本的模型服務。我們的使用者可以簡單地指定他們想要使用的模型版本,並且在使用者沒有明確請求某個版本的情況下,我們可以將其中一個視為“預設”版本。

這是 Ray Serve 憑藉其稱為模型多路複用的功能大放異彩的領域之一。實際上,這允許您載入模型的多個“版本”,根據需要動態地熱插拔它們,並在一段時間後解除安裝不使用的版本。換句話說,非常節省空間。

讓我們嘗試註冊模型的另一個版本——這次是一個從英語翻譯到法語的版本。我們將此版本註冊為 "2";模型伺服器將以這種方式檢索模型版本。

但首先,讓我們擴充套件模型伺服器以支援多路複用。

from ray import serve
from fastapi import FastAPI

app = FastAPI()


@serve.deployment
@serve.ingress(app)
class MultiplexedModelDeployment:

    @serve.multiplexed(max_num_models_per_replica=2)
    async def get_model(self, version: str):
        return mlflow.pyfunc.load_model(f"models:/{self.model_name}/{version}")

    def __init__(
        self,
        model_name: str = "translation_model",
        default_version: str = en_to_de_version,
    ):
        self.model_name = model_name
        self.default_version = default_version

    @app.post("/serve")
    async def serve(self, input_string: str):
        model = await self.get_model(serve.get_multiplexed_model_id())
        return model.predict(pd.DataFrame({"prompt": [input_string]}))
multiplexed_deployment = MultiplexedModelDeployment.bind(model_name="translation_model")
serve.run(multiplexed_deployment, blocking=False)

現在讓我們實際註冊新的模型版本。

import pandas as pd

with mlflow.start_run():
    model_info = mlflow.pyfunc.log_model(
        "translation_model",
        registered_model_name="translation_model",
        python_model=MyTranslationModel(),
        pip_requirements=["transformers"],
        input_example=pd.DataFrame(
            {
                "prompt": [
                    "Hello my name is Jon.",
                ],
            }
        ),
        model_config={
            "hfhub_name": "google-t5/t5-base",
            "lang_from": "en",
            "lang_to": "fr",
        },
    )

en_to_fr_version: str = str(model_info.registered_model_version)

註冊完成後,我們可以透過模型伺服器查詢它,如下所示:

>>> import requests

>>> response = requests.post(
...     "http://127.0.0.1:8000/serve/",
...     params={"input_string": "The weather is lovely today"},
...     headers={"serve_multiplexed_model_id": en_to_fr_version},
... )

>>> print(response.json())
[{'translation_text': "Le temps est beau aujourd'hui"}]

請注意,我們能夠立即訪問模型版本,而無需重新部署模型伺服器。Ray Serve 的多路複用功能使其能夠即時動態獲取模型權重;如果我從未請求版本 2,它就永遠不會被載入。這有助於為確實被查詢的模型節省計算資源。更有用的是,如果載入的模型數量超過了配置的最大值 (max_num_models_per_replica),則最近最少使用的模型版本將被逐出

鑑於我們上面設定了 max_num_models_per_replica=2,預設的英譯德模型版本應該仍然已載入並隨時可用於服務請求,沒有任何冷啟動時間。現在讓我們確認一下

>>> print(
...     requests.post(
...         "http://127.0.0.1:8000/serve/",
...         params={"input_string": "The weather is lovely today"},
...         headers={"serve_multiplexed_model_id": en_to_de_version},
...     ).json()
... )
[{'translation_text': 'Das Wetter ist heute nett.'}]

自動簽名

這一切都很好。但是,請注意以下摩擦點仍然存在:在定義伺服器時,我們需要為 API 本身定義一個全新的簽名。最好情況下,這只是模型簽名本身(已在 MLflow 中註冊)的一些程式碼重複。最壞情況下,這可能導致您的團隊或組織擁有的所有模型之間的 API 不一致,從而導致下游依賴項的混亂和挫敗感。

在這種特殊情況下,這意味著 MultiplexedModelDeployment 實際上是translation_model 的用例緊密耦合的。如果我們想部署另一組與語言翻譯無關的模型怎麼辦?已定義的 /serve API(返回一個類似 {"translated_text": "foo"} 的 JSON 物件)將不再有意義。

為了解決這個問題,如果 MultiplexedModelDeployment 的 API 簽名可以自動映象其所服務的基礎模型的簽名,那該怎麼辦?

幸運的是,藉助 MLflow 模型登錄檔元資料和一些 Python 動態類建立的技巧,這完全可以實現。

讓我們設定好,以便模型伺服器簽名可以從註冊模型本身推斷。由於 MLflow 的不同版本可以有不同的簽名,我們將使用“預設版本”來“固定”簽名;任何嘗試多路複用不相容簽名模型版本的行為都將丟擲錯誤。

由於 Ray Serve 在類定義時繫結請求和響應簽名,我們將使用 Python 元類將其設定為指定模型名稱和預設模型版本的函式。

import mlflow
import pydantic


def schema_to_pydantic(schema: mlflow.types.schema.Schema, *, name: str) -> pydantic.BaseModel:
    return pydantic.create_model(
        name, **{k: (v.type.to_python(), pydantic.Field(required=True)) for k, v in schema.input_dict().items()}
    )


def get_req_resp_signatures(
    model_signature: mlflow.models.ModelSignature,
) -> tuple[pydantic.BaseModel, pydantic.BaseModel]:
    inputs: mlflow.types.schema.Schema = model_signature.inputs
    outputs: mlflow.types.schema.Schema = model_signature.outputs

    return (schema_to_pydantic(inputs, name="InputModel"), schema_to_pydantic(outputs, name="OutputModel"))
import mlflow

from fastapi import FastAPI, Response, status
from ray import serve
from typing import List


def deployment_from_model_name(model_name: str, default_version: str = "1"):
    app = FastAPI()
    model_info = mlflow.models.get_model_info(f"models:/{model_name}/{default_version}")
    input_datamodel, output_datamodel = get_req_resp_signatures(model_info.signature)

    @serve.deployment
    @serve.ingress(app)
    class DynamicallyDefinedDeployment:

        MODEL_NAME: str = model_name
        DEFAULT_VERSION: str = default_version

        @serve.multiplexed(max_num_models_per_replica=2)
        async def get_model(self, model_version: str):
            model = mlflow.pyfunc.load_model(f"models:/{self.MODEL_NAME}/{model_version}")

            if model.metadata.get_model_info().signature != model_info.signature:
                raise ValueError(
                    f"Requested version {model_version} has signature incompatible with that of default version {self.DEFAULT_VERSION}"
                )
            return model

        # TODO: Extend this to support batching (lists of inputs and outputs)
        @app.post("/serve", response_model=List[output_datamodel])
        async def serve(self, model_input: input_datamodel, response: Response):
            model_id = serve.get_multiplexed_model_id()
            if model_id == "":
                model_id = self.DEFAULT_VERSION

            try:
                model = await self.get_model(model_id)
            except ValueError:
                response.status_code = status.HTTP_409_CONFLICT
                return [{"translation_text": "FAILED"}]

            return model.predict(model_input.dict())

    return DynamicallyDefinedDeployment


deployment = deployment_from_model_name("translation_model", default_version=en_to_fr_version)

serve.run(deployment.bind(), blocking=False)
>>> import requests

>>> resp = requests.post(
...     "http://127.0.0.1:8000/serve/",
...     json={"prompt": "The weather is lovely today"},
... )

>>> assert resp.ok
>>> assert resp.status_code == 200

>>> print(resp.json())
[{'translation_text': "Le temps est beau aujourd'hui"}]
>>> import requests

>>> resp = requests.post(
...     "http://127.0.0.1:8000/serve/",
...     json={"prompt": "The weather is lovely today"},
...     headers={"serve_multiplexed_model_id": str(en_to_fr_version)},
... )

>>> assert resp.ok
>>> assert resp.status_code == 200

>>> print(resp.json())
[{'translation_text': "Le temps est beau aujourd'hui"}]

現在讓我們確認一下我們設定的簽名檢查功能確實有效。為此,讓我們註冊一個具有略微不同簽名的相同模型。這應該足以觸發故障保護。

(還記得我們在練習開始時,將輸入標籤設定為可配置的嗎?這正是它最終發揮作用的地方。😎)

import pandas as pd

with mlflow.start_run():
    incompatible_version = str(
        mlflow.pyfunc.log_model(
            "translation_model",
            registered_model_name="translation_model",
            python_model=MyTranslationModel(),
            pip_requirements=["transformers"],
            input_example=pd.DataFrame(
                {
                    "text_to_translate": [
                        "Hello my name is Jon.",
                    ],
                }
            ),
            model_config={
                "input_label": "text_to_translate",
                "hfhub_name": "google-t5/t5-base",
                "lang_from": "en",
                "lang_to": "de",
            },
        ).registered_model_version
    )
import requests

resp = requests.post(
    "http://127.0.0.1:8000/serve/",
    json={"prompt": "The weather is lovely today"},
    headers={"serve_multiplexed_model_id": incompatible_version},
)
assert not resp.ok
resp.status_code == 409

assert resp.json()[0]["translation_text"] == "FAILED"

(從技術上講,“正確”的做法是實現一個響應容器,允許將“錯誤訊息”定義為實際響應的一部分,而不是像我們在這裡所做的那樣“濫用” `translation_text` 欄位。但是,出於演示目的,這樣做就可以了。)

為了徹底完成,讓我們嘗試註冊一個完全不同的模型——具有完全不同的簽名——並透過 deployment_from_model_name() 部署它。這將幫助我們確認整個簽名是從載入的模型中定義的。

import mlflow
from transformers import pipeline


class QuestionAnswererModel(mlflow.pyfunc.PythonModel):
    def load_context(self, context):

        self.model_context = context.model_config.get(
            "model_context",
            "My name is Hans and I live in Germany.",
        )
        self.model_name = context.model_config.get(
            "model_name",
            "deepset/roberta-base-squad2",
        )

        self.tokenizer_name = context.model_config.get(
            "tokenizer_name",
            "deepset/roberta-base-squad2",
        )

        self.pipeline = pipeline(
            "question-answering",
            model=self.model_name,
            tokenizer=self.tokenizer_name,
        )

    def predict(self, context, model_input, params=None):
        resp = self.pipeline(
            question=model_input["question"].tolist(),
            context=self.model_context,
        )

        return [resp] if type(resp) is not list else resp
import pandas as pd

with mlflow.start_run():
    model_info = mlflow.pyfunc.log_model(
        "question_answerer",
        registered_model_name="question_answerer",
        python_model=QuestionAnswererModel(),
        pip_requirements=["transformers"],
        input_example=pd.DataFrame(
            {
                "question": [
                    "Where do you live?",
                    "What is your name?",
                ],
            }
        ),
        model_config={
            "model_context": "My name is Hans and I live in Germany.",
        },
    )
>>> print(model_info.signature)
inputs: 
  ['question': string (required)]
outputs: 
  ['score': double (required), 'start': long (required), 'end': long (required), 'answer': string (required)]
params: 
  None
from ray import serve

serve.run(
    deployment_from_model_name(
        "question_answerer",
        default_version=str(model_info.registered_model_version),
    ).bind(),
    blocking=False,
)
>>> import requests

>>> resp = requests.post(
...     "http://127.0.0.1:8000/serve/",
...     json={"question": "The weather is lovely today"},
... )
>>> print(resp.json())
[{'score': 3.255764386267401e-05, 'start': 30, 'end': 38, 'answer': 'Germany.'}]

結論

在本筆記本中,我們利用 MLflow 對模型簽名的內建支援,極大地簡化了部署 HTTP 伺服器以線上提供模型服務的流程。我們利用 Ray Serve 強大但靈活的原始功能,使我們能夠一鍵部署一個具有以下功能的模型伺服器:

  • 版本多路複用;
  • 自動 REST API 簽名設定;
  • 防止使用不相容簽名模型版本的安全措施。

透過這樣做,我們展示了 Ray Serve 作為工具包的價值和潛力,您和您的團隊可以基於此“構建自己的機器學習平臺”。

我們還演示瞭如何減少同時使用多個工具所帶來的整合開銷和繁瑣工作。無縫整合是支援自包含、一體化平臺(如 AWS Sagemaker 或 GCP Vertex AI)的有力論據。我們已經證明,只要進行一些巧妙的工程設計,並以使用者(在本例中是 MLE)關心的痛點為指導原則,我們就可以獲得類似的好處,而無需將自己和團隊與昂貴的供應商合同繫結。

練習

  • 生成的 API 簽名與模型簽名非常相似,但仍存在一些不匹配。您能找出它在哪裡嗎?嘗試修復它。提示:當您嘗試向我們設定的問答端點傳遞多個問題時會發生什麼?
  • MLflow 模型簽名允許可選輸入。我們當前的實現沒有考慮到這一點。我們如何擴充套件這裡的實現來支援可選輸入?
  • 同樣,MLflow 模型簽名允許非輸入“推理引數”,而我們當前的實現也不支援。我們如何擴充套件我們的實現來支援推理引數?
  • 無論我們傳入什麼模型名稱和版本,我們每次生成新部署時都使用名稱 DynamicallyDefinedDeployment。這是一個問題嗎?如果是,您認為這種方法會產生什麼樣的問題?嘗試調整 deployment_from_model_name() 來處理這些問題。
< > 在 GitHub 上更新

© . This site is unofficial and not affiliated with Hugging Face, Inc.