我正在开发一个Azure函数blob触发器来从blob中的CSV读取列,并将列值传递给API请求以获得每个值的JSON响应,我想将每个响应写入到Azure blob存储中的新.JSON文件中。
以下是我的代码
import logging
import azure.functions as func
from azure.storage.blob import BlobServiceClient,BlobClient
import pandas as pd
import requests
import json
import os, io
def main(inputBlob: func.InputStream, outputBlob: func.Out[str]):
logging.info(f"Blob trigger executed!")
logging.info(f"Blob Name: {inputBlob.name} ({inputBlob.length}) bytes")
logging.info(f"Full Blob URI: {inputBlob.uri}")
# connections
connection_string = "DefaultEndpointsProtocol=https;AccountName=nhtsa;AccountKey=*********==;EndpointSuffix=core.windows.net"
containerName = "myblobcontainer"
blobName = "input/NHTSA_Inbound.csv"
out_blob = "output"
blob = BlobClient.from_connection_string(conn_str=connection_string, container_name=containerName, blob_name=blobName)
# reading csv column values and pass into request to get each value response
blobStream = blob.download_blob().content_as_bytes()
logging.info(blobStream)
df = pd.read_csv(io.BytesIO(blobStream), sep=',', dtype=str)
vin_df = pd.DataFrame(df['vin'])
vin_tup = list(vin_df.to_records(index=False))
for i,t in enumerate(vin_tup):
nhtsa_url = 'https://vpic.nhtsa.dot.gov/api/vehicles/DecodeVinValuesExtended/%s?format=json&modelyear=2011'%t[0]
nhtsa_req_content = requests.get(nhtsa_url)
nhtsa_data = nhtsa_req_content.json()
with open(os.path.join(outputBlob,'Veh_%s.json'%t[0]),"w+") as output_file:
json.dump(list(list(nhtsa_data.values())[0]),output_file)除了文件写入部分之外,一切都运行得很好。我正在调试VS代码中的函数,而我的断点在此位置抛出错误
with open(os.path.join(outputBlob,'Veh_%s.json'%t),"w+") as output_file: json.dump(list(list(nhtsa_data.values(),output_file)
error
Executed 'Functions.BlobTrigger1' (Failed, Id=8c1dd2bb-7fb9-4670-9f44-4fb290530325, Duration=34741ms)
[2021-05-15T15:51:01.978Z] System.Private.CoreLib: Exception while executing function: Functions.BlobTrigger1. System.Private.CoreLib: Result: Failure
Exception: TypeError: expected str, bytes or os.PathLike object, not Out
Stack: File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.7/WINDOWS/X64\azure_functions_worker\dispatcher.py", line 372, in _handle__invocation_request
self.__run_sync_func, invocation_id, fi.func, args)
File "C:\Program Files\Python37\lib\concurrent\futures\thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "C:\Program Files\Microsoft\Azure Functions Core Tools\workers\python\3.7/WINDOWS/X64\azure_functions_worker\dispatcher.py", line 548, in __run_sync_func
return func(**params)
File "C:\Users\yyy1u39\Desktop\Campaign Group Dashboard\Portfolio\Azure Data Factory\BlobTrigger1\__init__.py", line 35, in main
with open(os.path.join(outputBlob,'Veh_%s.json'%t[0]),"w+") as output_file:
File "C:\Program Files\Python37\lib\ntpath.py", line 76, in join
path = os.fspath(path)
.发布于 2021-05-17 09:52:18
建议使用Azure存储blob SDK上传多个请求结果,而不是Azure函数blob输出绑定。
只需尝试下面的代码:
import requests
from azure.storage.blob import ContainerClient
#skip reading csv
#Call demo API and save result individually
requestURLs =['https://reqres.in/api/users?page=1','https://reqres.in/api/users?page=2']
storageConnStr = ''
containerName =''
container_client = ContainerClient.from_connection_string(storageConnStr,containerName)
count = 0
for req in requestURLs:
result = requests.get(req).text
container_client.get_blob_client('output/req'+ str(count) + '.json').upload_blob(result)
count+=1结果:

如果你还有什么问题,请告诉我。
发布于 2021-05-17 01:53:58
你可以使用Azure storage blob SDK:https://docs.microsoft.com/en-us/azure/storage/blobs/storage-quickstart-blobs-python
https://stackoverflow.com/questions/67543669
复制相似问题