我在烧瓶中有一个小服务,它是一个geoJson,并返回一个带有.shp、.dbf和.shx文件的zip。我可以使用函数gdf.to_file('file.shp')
返回这个压缩文件,但是我不想在磁盘中写入,因为它将花费更多的时间并缩短设备的寿命(我的服务将在一天内被调用超过一千次)。我该怎么做?下面是我使用的代码:
from flask import Flask, request, send_file
import json
import geopandas as gpd
import io
from zipfile import ZipFile
import os
app = Flask(__name__)
@app.route('/geojson2shp', methods=['POST'])
def result():
geojson = json.loads(request.values['geojson'])
gdf = gpd.GeoDataFrame.from_features(geojson['features'])
currentPath = os.path.dirname(os.path.abspath(__file__))
gdf.to_file(currentPath+'/tmp/file.shp', driver='ESRI Shapefile')
zipObj = ZipFile(currentPath+'/tmp/sample.zip', 'w')
zipObj.write(currentPath+'/tmp/file.shp')
zipObj.write(currentPath+'/tmp/file.dbf')
zipObj.write(currentPath+'/tmp/file.shx')
zipObj.close()
return send_file(currentPath+'/tmp/sample.zip', attachment_filename='sample.zip')
编辑:
我一直在寻找一种在Fiona中使用BytesIO的方法,多亏了@matthew-borish链接和@marat提示,我找到了它。通过下面的代码,我只使用内存创建了一个.shp文件。
def resultWithMemory():
geojson = json.loads(request.values['geojson'])
gdf = gpd.GeoDataFrame.from_features(geojson['features'])
shp = io.BytesIO()
gdf.to_file(shp, driver='ESRI Shapefile')
zip_buffer = io.BytesIO()
with ZipFile(zip_buffer, 'w') as zf:
zf.writestr("file.shp", shp.getbuffer())
zip_buffer.seek(0)
return send_file(zip_buffer, attachment_filename='sample.zip', as_attachment=True)
但是它有一个小问题:.shx和.dbf文件没有出现在"shp“BufferIO对象中。为什么?稍后我将研究Fiona源代码,并尝试找到它。
发布于 2022-01-24 18:41:36
您可以以时间戳作为密钥,将gdfs存储在一个小块中。
import datetime
gdf_dict = {}
def result():
# geojson = json.loads(request.values['geojson'])
# gdf = gpd.GeoDataFrame.from_features(geojson['features'])
# dummy df for demo purposes
gdf = gpd.GeoDataFrame(geometry=[LineString([(1, 1), (4, 4)]),
LineString([(1, 4), (4, 1)]),
LineString([(6, 1), (6, 6)])])
return gdf
每次调用服务时,都可以将结果存储在gdf_dict中。
gdf_dict[datetime.datetime.now()] = result()
如果你愿意,你可以保存到光盘偶尔用泡菜。
with open('gdf.pickle', 'wb') as handle:
pickle.dump(gdf_dict, handle)
反之亦然。
with open('gdf.pickle', 'rb') as handle:
from_pickle = pickle.load(handle)
如果您愿意的话,可以按时间戳进行搜索。
def get_gdfs(gdf_dict, from_date_str, to_date_str):
return sorted((k, v) for k, v in gdf_dict.items() if from_date_str <= k <= to_date_str)
gdfs_by_date = get_gdfs(gdf_dict, datetime.datetime(2022, 1, 21, 17, 46, 30, 408735), datetime.datetime(2023, 1, 22, 17, 48, 30, 408735))
或者,或者结合上面的一些内容,您可以使用pd.concat()组合gdfs和任何查询的datetimeindex。如果你对此很好奇,请留言,我可以提供更新。
https://stackoverflow.com/questions/70839961
复制