如何使用 Azure OpenAI
【Azure 微调示例】介绍了使用 Azure 端点完成微调操作的流程。文章包括设置 API、文件操作、微调操作、以及如何使用微调模型创建一个部署。示例代码详细讲解了各个操作的实现,同时也对使用 openAI 端点的操作进行了对比。本文可供开发人员参考和学习。
Azure 微调示例
在此示例中,我们将尝试查看可以使用 Azure 端点完成的所有操作及其与 openAi 端点(如果有)的差异。
此示例侧重于微调,但也涉及使用 API 可用的大多数操作。 这个例子是为了快速展示简单的操作,而不是作为微调模型适配教程。
import openai from openai import cli
设置
为了使以下部分正常工作,我们首先必须设置一些东西。 让我们从 api_base
和 api_version
开始。 要找到您的 api_base
,请转到 https://portal.azure.com,找到您的资源,然后在“资源管理”->“Keys and Endpoints”下查找“Endpoint”值。
openai.api_version = '2022-12-01' openai.api_base = '' # Please add your endpoint here
接下来我们必须设置 api_type
和 api_key
。 我们可以从门户网站获取密钥,也可以通过 Microsoft Active Directory 身份验证获取密钥。 取决于此,api_type
是 azure
或 azure_ad
。
设置:门户
让我们首先看看从门户中获取密钥。 转到 https://portal.azure.com,找到您的资源,然后在“资源管理”->“Keys and Endpoints”下查找“Keys”值之一。
openai.api_type = 'azure' openai.api_key = '' # Please add your api key here
(可选)设置:Microsoft Active Directory 身份验证
现在让我们看看如何通过 Microsoft Active Directory 身份验证获取密钥。 如果您想使用 Active Directory 身份验证而不是门户中的密钥,请取消注释以下代码。
# from azure.identity import DefaultAzureCredential # default_credential = DefaultAzureCredential() # token = default_credential.get_token("https://cognitiveservices.azure.com/.default") # openai.api_type = 'azure_ad' # openai.api_key = token.token
文件
在下一节中,我们将重点介绍文件操作:导入、列出、检索、删除。 为此,我们需要用一些示例数据创建 2 个临时文件。 为了简单起见,我们将使用相同的数据进行训练和验证。
import shutil import json training_file_name = 'training.jsonl' validation_file_name = 'validation.jsonl' sample_data = [{"prompt": "When I go to the store, I want an", "completion": "apple."}, {"prompt": "When I go to work, I want a", "completion": "coffee."}, {"prompt": "When I go home, I want a", "completion": "soda."}] print(f'Generating the training file: {training_file_name}') with open(training_file_name, 'w') as training_file: for entry in sample_data: json.dump(entry, training_file) training_file.write('\n') print(f'Copying the training file to the validation file') shutil.copy(training_file_name, validation_file_name)
文件:列表
列出所有上传的文件并检查名为“training.jsonl
”或“validation.jsonl
”的文件
print('Checking for existing uploaded files.') results = [] files = openai.File.list().data print(f'Found {len(files)} total uploaded files in the subscription.') for item in files: if item["filename"] in [training_file_name, validation_file_name]: results.append(item["id"]) print(f'Found {len(results)} already uploaded files that match our names.')
文件:删除
现在让我们删除那些找到的文件(如果有的话),因为接下来我们将重新上传它们。
print(f'Deleting already uploaded files...') for id in results: openai.File.delete(sid = id)
文件:导入和检索
现在,让我们导入我们的两个文件(“training.jsonl
”和“validation.jsonl
”)并保留这些 ID,因为我们稍后将使用它们进行微调。
对于此操作,我们将使用 cli 包装器,它会在上传前进行更多检查,并为我们提供进度。 此外,上传后我们将检查我们的导入状态,直到它成功(如果出现问题则失败)
import time def check_status(training_id, validation_id): train_status = openai.File.retrieve(training_id)["status"] valid_status = openai.File.retrieve(validation_id)["status"] print(f'Status (training_file | validation_file): {train_status} | {valid_status}') return (train_status, valid_status) #importing our two files training_id = cli.FineTune._get_or_upload(training_file_name, True) validation_id = cli.FineTune._get_or_upload(validation_file_name, True) #checking the status of the imports (train_status, valid_status) = check_status(training_id, validation_id) while train_status not in ["succeeded", "failed"] or valid_status not in ["succeeded", "failed"]: time.sleep(1) (train_status, valid_status) = check_status(training_id, validation_id)
文件:下载
现在让我们下载其中一个文件,例如训练文件,以检查导入过程中是否一切正常,所有位都在那里。
print(f'Downloading training file: {training_id}') result = openai.File.download(training_id) print(result.decode('utf-8'))
微调
在本节中,我们将使用我们在上一节中导入的两个训练和验证文件来训练微调模型。
微调:适应
首先让我们创建微调适配作业。
create_args = { "training_file": training_id, "validation_file": validation_id, "model": "babbage", "compute_classification_metrics": True, "classification_n_classes": 3, "n_epochs": 20, "batch_size": 3, "learning_rate_multiplier": 0.3 } resp = openai.FineTune.create(**create_args) job_id = resp["id"] status = resp["status"] print(f'Fine-tunning model with jobID: {job_id}.')
Finetune:流媒体
当作业运行时,我们可以订阅流式事件来检查操作的进度。
import signal import datetime def signal_handler(sig, frame): status = openai.FineTune.retrieve(job_id).status print(f"Stream interrupted. Job is still {status}.") return print(f'Streaming events for the fine-tuning job: {job_id}') signal.signal(signal.SIGINT, signal_handler) events = openai.FineTune.stream_events(job_id) try: for event in events: print(f'{datetime.datetime.fromtimestamp(event["created_at"])} {event["message"]}') except Exception: print("Stream interrupted (client disconnected).")
Finetune:列表和检索
现在让我们检查我们的操作是否成功,此外我们可以使用列表操作查看所有微调操作。
status = openai.FineTune.retrieve(id=job_id)["status"] if status not in ["succeeded", "failed"]: print(f'Job not in terminal status: {status}. Waiting.') while status not in ["succeeded", "failed"]: time.sleep(2) status = openai.FineTune.retrieve(id=job_id)["status"] print(f'Status: {status}') else: print(f'Finetune job {job_id} finished with status: {status}') print('Checking other finetune jobs in the subscription.') result = openai.FineTune.list() print(f'Found {len(result.data)} finetune jobs.')
Finetune:删除
最后我们可以删除我们的微调作业。
警告:如果您想继续下一节,请跳过此步骤,因为需要微调模型。 (删除代码默认注释掉)
# openai.FineTune.delete(sid=job_id)
部署
在本节中,我们将使用刚刚调整的微调模型创建一个部署,然后使用该部署创建一个简单的完成操作。
部署:创建
让我们使用微调模型创建部署。
#Fist let's get the model of the previous job: result = openai.FineTune.retrieve(id=job_id) if result["status"] == 'succeeded': model = result["fine_tuned_model"] # Now let's create the deployment print(f'Creating a new deployment with model: {model}') result = openai.Deployment.create(model=model, scale_settings={"scale_type":"standard"}) deployment_id = result["id"]
部署:检索
现在让我们检查新创建的部署的状态
print(f'Checking for deployment status.') resp = openai.Deployment.retrieve(id=deployment_id) status = resp["status"] print(f'Deployment {deployment_id} is with status: {status}')
部署:列表
现在因为创建新部署需要很长时间,让我们在订阅中查看已成功完成的部署。
print('While deployment running, selecting a completed one.') deployment_id = None result = openai.Deployment.list() for deployment in result.data: if deployment["status"] == "succeeded": deployment_id = deployment["id"] break if not deployment_id: print('No deployment with status: succeeded found.') else: print(f'Found a successful deployment with id: {deployment_id}.')
完成
现在让我们向部署发送示例完成。
print('Sending a test completion job') start_phrase = 'When I go home, I want a' response = openai.Completion.create(deployment_id=deployment_id, prompt=start_phrase, temperature=0, stop=".") text = response['choices'][0]['text'].replace('\n', '').replace(' .', '.').strip() print(f'"{start_phrase} {text}."')
部署:删除
最后让我们删除部署
print(f'Deleting deployment: {deployment_id}') openai.Deployment.delete(sid=deployment_id)
评论 (0)