首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >亚马逊 MWS API 实战:商品详情精准获取与跨境电商数据整合方案(附多站点适配 + 签名避坑代码)

亚马逊 MWS API 实战:商品详情精准获取与跨境电商数据整合方案(附多站点适配 + 签名避坑代码)

原创
作者头像
互联网分享者
修改2025-09-22 16:37:07
修改2025-09-22 16:37:07
2340
举报

马逊作为全球最大的电商平台之一,其商品数据包含丰富的跨境贸易信息,包括多站点商品变体、国际物流选项、本地化价格策略等。与其他电商平台相比,亚马逊 MWS (Merchant Web Service) 接口体系更为复杂,但能提供更深入的商品数据维度。本文将系统讲解亚马逊商品详情接口的技术实现,重点解决多站点数据获取、签名认证、变体商品解析和价格趋势分析等核心问题,提供一套可直接应用于跨境电商选品和数据分析的完整解决方案。 一、接口基础信息与应用场景 接口核心信息

亚马逊 MWS 商品接口的关键技术参数:

   核心域名:根据站点不同而变化(如北美站https://mws.amazonservices.com)    认证方式:AWS4-HMAC-SHA256 签名机制    请求格式:HTTP GET/POST,XML 格式    响应格式:XML    编码格式:UTF-8    调用限制:根据 API 类型不同,通常 QPS=1-10,每日有调用上限

核心接口列表 接口名称 主要功能 适用场景 GetMatchingProduct 通过 ASIN 获取商品基本信息 快速查询商品基础数据 GetMatchingProductForId 通过多种 ID 类型获取商品信息 多 ID 体系下的商品匹配 GetProductCategoriesForASIN 获取商品分类信息 类目分析与分类统计 GetLowestPricedOffersForASIN 获取商品最低价信息 价格监控与竞品分析 GetCompetitivePricingForASIN 获取竞争价格数据 定价策略制定 典型应用场景

   跨境电商选品工具:分析不同站点的商品潜力与竞争度    价格监控系统:实时跟踪商品价格变动与趋势分析    库存管理系统:对接亚马逊库存数据,实现智能补货    多平台商品同步:将亚马逊商品信息同步至其他销售渠道    市场调研工具:分析特定类目的市场分布与竞争格局

接口调用流程

plaintext

   开发者账号注册 → MWS授权 → 密钥获取 → 请求签名生成 →     多站点接口调用 → XML响应解析 → 数据标准化 → 存储与分析

点击获取key和secret 二、接口认证与参数详解 AWS4-HMAC-SHA256 签名机制

亚马逊 MWS 采用复杂但安全的 AWS4 签名算法,具体流程:

   创建规范请求字符串(按特定格式组织请求参数)    创建签名上下文(包含请求日期、区域、服务等信息)    使用 Secret Key 生成签名密钥(通过 HMAC-SHA256 多次哈希)    计算签名(对规范请求和签名上下文进行哈希)    将签名添加到请求头或参数中

商品详情接口核心参数 参数名 类型 说明 是否必须 AWSAccessKeyId String MWS 访问密钥 是 Action String 接口名称,如 GetMatchingProduct 是 SellerId String 卖家账号 ID 是 SignatureVersion String 签名版本,固定为 2 是 Timestamp String 时间戳,格式为 ISO8601 是 Version String API 版本,如 2011-10-01 是 MarketplaceId String 市场 ID,如 US 市场为 ATVPDKIKX0DER 是 ASINList.ASIN.1 String 商品 ASIN 码 是 响应结果结构(简化版)

GetMatchingProduct 接口的核心响应字段:

xml

   <GetMatchingProductResponse xmlns="http://mws.amazonservices.com/schema/Products/2011-10-01">      <GetMatchingProductResult ASIN="B07XYZ1234" status="Success">        <Product>          <Identifiers>            <MarketplaceASIN>              <MarketplaceId>ATVPDKIKX0DER</MarketplaceId>              <ASIN>B07XYZ1234</ASIN>            </MarketplaceASIN>          </Identifiers>          <AttributeSets>            <ItemAttributes xml:lang="en-US">              <Title>Wireless Bluetooth Headphones with Noise Cancellation</Title>              <Brand>SoundMaster</Brand>              <Color>Black</Color>              <Model>SM-BT-001</Model>              <Price>                <Amount>79.99</Amount>                <CurrencyCode>USD</CurrencyCode>              </Price>              <NumberOfItems>1</NumberOfItems>              <ProductGroup>Electronics</ProductGroup>            </ItemAttributes>          </AttributeSets>          <Relationships>            <VariationParent>              <Identifiers>                <MarketplaceASIN>                  <MarketplaceId>ATVPDKIKX0DER</MarketplaceId>                  <ASIN>B07XYZ0000</ASIN>                </MarketplaceASIN>              </Identifiers>            </VariationParent>          </Relationships>          <SalesRankings>            <SalesRank>              <ProductCategoryId>electronics_display_on_website</ProductCategoryId>              <Rank>1250</Rank>            </SalesRank>          </SalesRankings>        </Product>      </GetMatchingProductResult>      <ResponseMetadata>        <RequestId>abc12345-6789-0123-4567-890abcdef123</RequestId>      </ResponseMetadata>    </GetMatchingProductResponse>

三、核心技术实现 1. AWS 签名工具类

python

运行

   import hmac    import hashlib    import urllib.parse    from datetime import datetime    class AmazonSigner:        """亚马逊MWS API签名工具类""" @staticmethod        def sign(secret_key, region, service, string_to_sign):            """            使用AWS4-HMAC-SHA256算法签名            :param secret_key: MWS密钥            :param region: 区域            :param service: 服务名称,通常为mws            :param string_to_sign: 待签名字符串            :return: 签名结果            """            # 生成签名密钥            date_stamp = datetime.utcnow().strftime('%Y%m%d')            k_date = hmac.new(('AWS4' + secret_key).encode('utf-8'),                              date_stamp.encode('utf-8'),                              hashlib.sha256).digest()            k_region = hmac.new(k_date, region.encode('utf-8'), hashlib.sha256).digest()            k_service = hmac.new(k_region, service.encode('utf-8'), hashlib.sha256).digest()            k_signing = hmac.new(k_service, 'aws4_request'.encode('utf-8'), hashlib.sha256).digest()            # 计算最终签名            signature = hmac.new(k_signing, string_to_sign.encode('utf-8'), hashlib.sha256).hexdigest()            return signature @staticmethod        def create_canonical_request(method, host, path, params, headers):            """创建规范请求字符串"""            # 1. 规范HTTP方法            canonical_method = method.upper()            # 2. 规范URI路径            canonical_uri = path if path else '/'            # 3. 规范查询字符串(参数按字母排序)            sorted_params = sorted(params.items(), key=lambda x: x[0])            canonical_querystring = '&'.join([f"{k}={AmazonSigner.percent_encode(v)}" for k, v in sorted_params])            # 4. 规范 headers            sorted_headers = sorted(headers.items(), key=lambda x: x[0].lower())            canonical_headers = ''.join([f"{k.lower()}:{v.strip()}\n" for k, v in sorted_headers])            # 5. 签名 headers            signed_headers = ';'.join([k.lower() for k, _ in sorted_headers])            # 6.  payload哈希(GET请求为空字符串)            payload_hash = hashlib.sha256(('' if method.upper() == 'GET' else '').encode('utf-8')).hexdigest()            # 组合规范请求            canonical_request = f"{canonical_method}\n{canonical_uri}\n{canonical_querystring}\n" \                               f"{canonical_headers}\n{signed_headers}\n{payload_hash}"            return canonical_request, signed_headers @staticmethod        def create_string_to_sign(canonical_request, region, service):            """创建待签名字符串"""            algorithm = 'AWS4-HMAC-SHA256'            request_date = datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')            date_stamp = datetime.utcnow().strftime('%Y%m%d')            credential_scope = f"{date_stamp}/{region}/{service}/aws4_request"            hashed_canonical_request = hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()            string_to_sign = f"{algorithm}\n{request_date}\n{credential_scope}\n{hashed_canonical_request}"            return string_to_sign, algorithm, request_date, credential_scope @staticmethod        def percent_encode(value):            """百分比编码,符合AWS要求"""            if not value:                return ''            encoded = urllib.parse.quote(str(value), safe='-_.~')            # AWS要求将某些字符进行特殊编码            return encoded.replace('+', '%20').replace('*', '%2A').replace('%7E', '~')

2. 亚马逊商品接口客户端

python

运行

   import requests    import time    import xml.etree.ElementTree as ET    from datetime import datetime    from threading import Lock    import re    class AmazonProductClient:        """亚马逊商品接口客户端"""        # 亚马逊各站点信息        MARKETPLACES = {            'US': {                'id': 'ATVPDKIKX0DER',                'endpoint': 'https://mws.amazonservices.com',                'region': 'us-east-1'            },            'CA': {                'id': 'A2EUQ1WTGCTBG2',                'endpoint': 'https://mws.amazonservices.ca',                'region': 'us-east-1'            },            'UK': {                'id': 'A1F83G8C2ARO7P',                'endpoint': 'https://mws-eu.amazonservices.com',                'region': 'eu-west-1'            },            'DE': {                'id': 'A1PA6795UKMFR9',                'endpoint': 'https://mws-eu.amazonservices.com',                'region': 'eu-west-1'            },            'JP': {                'id': 'A1VC38T7YXB528',                'endpoint': 'https://mws.amazonservices.jp',                'region': 'us-west-2'            }        }        # API版本        API_VERSIONS = {            'products': '2011-10-01',            'pricing': '2011-10-01'        }        def __init__(self, access_key, secret_key, seller_id, marketplace='US'):            self.access_key = access_key            self.secret_key = secret_key            self.seller_id = seller_id            self.set_marketplace(marketplace)            self.timeout = 30  # 超时时间(秒)            self.qps_limit = 1  # 初始QPS限制,可根据实际情况调整            self.last_request_time = 0            self.request_lock = Lock()  # 线程锁控制QPS            self.namespace = {                'ns': 'http://mws.amazonservices.com/schema/Products/2011-10-01',                'pricing': 'http://mws.amazonservices.com/schema/Products/2011-10-01/CompetitivePricingType'            }        def set_marketplace(self, marketplace):            """设置市场"""            if marketplace not in self.MARKETPLACES:                raise ValueError(f"不支持的市场: {marketplace},支持的市场: {list(self.MARKETPLACES.keys())}")            self.marketplace = marketplace            self.marketplace_id = self.MARKETPLACES[marketplace]['id']            self.endpoint = self.MARKETPLACES[marketplace]['endpoint']            self.region = self.MARKETPLACES[marketplace]['region']        def _check_qps(self):            """检查并控制QPS"""            with self.request_lock:                current_time = time.time()                interval = 1.0 / self.qps_limit  # 每次请求最小间隔                elapsed = current_time - self.last_request_time                if elapsed < interval:                    # 需要等待的时间                    time.sleep(interval - elapsed)                self.last_request_time = current_time        def _get_base_params(self, action, api_type='products'):            """获取基础参数"""            return {                'AWSAccessKeyId': self.access_key,                'Action': action,                'SellerId': self.seller_id,                'SignatureVersion': '2',                'Timestamp': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),                'Version': self.API_VERSIONS.get(api_type, '2011-10-01'),                'MarketplaceId': self.marketplace_id            }        def _send_request(self, path, params, method='GET'):            """发送请求并处理响应"""            # 检查QPS限制            self._check_qps()            # 准备headers            headers = {                'Host': self.endpoint.split('//')[1],                'User-Agent': 'AmazonProductClient/1.0 (Language=Python)'            }            # 创建规范请求            canonical_request, signed_headers = AmazonSigner.create_canonical_request(                method, headers['Host'], path, params, headers            )            # 创建待签名字符串            string_to_sign, algorithm, request_date, credential_scope = AmazonSigner.create_string_to_sign(                canonical_request, self.region, 'mws'            )            # 计算签名            signature = AmazonSigner.sign(self.secret_key, self.region, 'mws', string_to_sign)            # 添加签名到参数            params['Signature'] = signature            # 构建请求URL            url = f"{self.endpoint}{path}"            try:                # 发送请求                if method.upper() == 'GET':                    response = requests.get(                        url,                        params=params,                        headers=headers,                        timeout=self.timeout                    )                else:                    response = requests.post(                        url,                        data=params,                        headers=headers,                        timeout=self.timeout                    )                # 检查响应状态                response.raise_for_status()                # 解析XML响应                return self._parse_xml_response(response.text)            except requests.exceptions.RequestException as e:                print(f"请求异常: {str(e)}")                # 处理可能的XML错误响应                if hasattr(e, 'response') and e.response is not None:                    try:                        error_xml = self._parse_xml_response(e.response.text)                        print(f"API错误响应: {error_xml}")                    except:                        pass                return None        def _parse_xml_response(self, xml_content):            """解析XML响应"""            try:                root = ET.fromstring(xml_content)                return self._xml_to_dict(root)            except ET.ParseError as e:                print(f"XML解析错误: {str(e)}")                print(f"错误XML内容: {xml_content[:500]}...")                return None        def _xml_to_dict(self, element):            """将XML元素转换为字典"""            result = {}            # 处理属性            if element.attrib:                result['@attributes'] = element.attrib            # 处理子元素            children = list(element)            if children:                for child in children:                    child_data = self._xml_to_dict(child)                    tag = child.tag.split('}')[-1]  # 去除命名空间                    if tag in result:                        # 如果标签已存在,转换为列表                        if not isinstance(result[tag], list):                            result[tag] = [result[tag]]                        result[tag].append(child_data)                    else:                        result[tag] = child_data            # 处理文本内容            elif element.text and element.text.strip():                result['#text'] = element.text.strip()            return result        def get_product_by_asin(self, asin):            """            通过ASIN获取商品详情            :param asin: 商品ASIN码            :return: 商品详情字典            """            if not asin or not re.match(r'^[A-Z0-9]{10}$', asin):                raise ValueError(f"无效的ASIN格式: {asin}")            # 接口参数            params = self._get_base_params('GetMatchingProduct')            params['ASINList.ASIN.1'] = asin            # 发送请求            response = self._send_request('/Products/2011-10-01', params)            # 解析结果            if not response:                return None            # 提取商品数据            result_key = 'GetMatchingProductResult'            if result_key not in response:                return None            # 处理可能的列表形式结果            products = response[result_key]            if not isinstance(products, list):                products = [products]            for product in products:                # 检查ASIN是否匹配且请求成功                if ('@attributes' in product and                     product['@attributes'].get('ASIN') == asin and                     product['@attributes'].get('status') == 'Success' and                    'Product' in product):                    return self._parse_product_data(product['Product'])            return None        def get_competitive_pricing(self, asin):            """获取商品竞争价格"""            if not asin or not re.match(r'^[A-Z0-9]{10}$', asin):                raise ValueError(f"无效的ASIN格式: {asin}")            # 接口参数            params = self._get_base_params('GetCompetitivePricingForASIN', 'pricing')            params['ASINList.ASIN.1'] = asin            # 发送请求            response = self._send_request('/Products/2011-10-01', params)            # 解析结果            if not response or 'GetCompetitivePricingForASINResult' not in response:                return None            results = response['GetCompetitivePricingForASINResult']            if not isinstance(results, list):                results = [results]            for result in results:                if ('@attributes' in result and                     result['@attributes'].get('ASIN') == asin and                     result['@attributes'].get('status') == 'Success' and                    'Product' in result):                    return self._parse_pricing_data(result['Product'])            return None        def _parse_product_data(self, product_data):            """解析商品数据"""            if not product_data:                return None            # 提取标识符信息            identifiers = {}            if 'Identifiers' in product_data and 'MarketplaceASIN' in product_data['Identifiers']:                mp_asin = product_data['Identifiers']['MarketplaceASIN']                identifiers = {                    'asin': mp_asin.get('ASIN', {}).get('#text', ''),                    'marketplace_id': mp_asin.get('MarketplaceId', {}).get('#text', '')                }            # 提取属性信息            attributes = {}            if 'AttributeSets' in product_data and 'ItemAttributes' in product_data['AttributeSets']:                item_attrs = product_data['AttributeSets']['ItemAttributes']                # 处理多语言情况                if isinstance(item_attrs, list):                    # 优先选择英文,没有则选第一个                    en_attrs = next((a for a in item_attrs if '@attributes' in a and a['@attributes'].get('xml:lang') == 'en-US'), None)                    item_attrs = en_attrs if en_attrs else item_attrs[0]                attributes = {                    'title': item_attrs.get('Title', {}).get('#text', ''),                    'brand': item_attrs.get('Brand', {}).get('#text', ''),                    'model': item_attrs.get('Model', {}).get('#text', ''),                    'color': item_attrs.get('Color', {}).get('#text', ''),                    'size': item_attrs.get('Size', {}).get('#text', ''),                    'description': item_attrs.get('Description', {}).get('#text', ''),                    'category': item_attrs.get('ProductGroup', {}).get('#text', ''),                    'price': self._parse_price(item_attrs.get('Price', {})),                    'number_of_items': int(item_attrs.get('NumberOfItems', {}).get('#text', 1))                }            # 提取变体信息            relationships = {}            if 'Relationships' in product_data:                # 检查是否为变体商品                if 'VariationParent' in product_data['Relationships']:                    parent_asin = product_data['Relationships']['VariationParent']['Identifiers']['MarketplaceASIN']['ASIN']['#text']                    relationships['is_variation'] = True                    relationships['parent_asin'] = parent_asin                # 检查是否有子变体                elif 'Variations' in product_data['Relationships']:                    variations = product_data['Relationships']['Variations']                    if 'Variation' in variations:                        variation_list = variations['Variation']                        if not isinstance(variation_list, list):                            variation_list = [variation_list]                        relationships['variations'] = [                            v['Identifiers']['MarketplaceASIN']['ASIN']['#text']                             for v in variation_list                        ]                        relationships['is_parent'] = True            # 提取销售排名            sales_rank = []            if 'SalesRankings' in product_data and 'SalesRank' in product_data['SalesRankings']:                ranks = product_data['SalesRankings']['SalesRank']                if not isinstance(ranks, list):                    ranks = [ranks]                for rank in ranks:                    sales_rank.append({                        'category_id': rank.get('ProductCategoryId', {}).get('#text', ''),                        'category_name': rank.get('ProductCategoryName', {}).get('#text', ''),                        'rank': int(rank.get('Rank', {}).get('#text', 0))                    })            return {                'identifiers': identifiers,                'attributes': attributes,                'relationships': relationships,                'sales_rank': sales_rank,                'marketplace': self.marketplace,                'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')            }        def _parse_pricing_data(self, pricing_data):            """解析价格数据"""            if not pricing_data or 'CompetitivePricing' not in pricing_data:                return None            result = {                'asin': pricing_data['Identifiers']['MarketplaceASIN']['ASIN']['#text'],                'marketplace': self.marketplace,                'prices': [],                'fetch_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')            }            competitive_pricing = pricing_data['CompetitivePricing']            if 'CompetitivePriceList' in competitive_pricing:                price_list = competitive_pricing['CompetitivePriceList']['CompetitivePrice']                if not isinstance(price_list, list):                    price_list = [price_list]                for price_item in price_list:                    if 'Price' in price_item:                        price_data = price_item['Price']                        result['prices'].append({                            'type': price_item.get('@attributes', {}).get('type', 'unknown'),                            'amount': float(price_data.get('Amount', {}).get('#text', 0)),                            'currency': price_data.get('CurrencyCode', {}).get('#text', ''),                            'condition': price_data.get('Condition', {}).get('#text', ''),                            'is_offer': 'OfferListingCount' in price_data                        })            # 提取最低价格            if 'LowestPrices' in competitive_pricing:                lowest_prices = competitive_pricing['LowestPrices']['LowestPrice']                if not isinstance(lowest_prices, list):                    lowest_prices = [lowest_prices]                result['lowest_prices'] = [                    {                        'condition': lp.get('Condition', {}).get('#text', ''),                        'price': self._parse_price(lp.get('Price', {})),                        'offer_count': int(lp.get('OfferCount', {}).get('#text', 0))                    } for lp in lowest_prices                ]            return result        def _parse_price(self, price_data):            """解析价格数据"""            if not price_data:                return None            return {                'amount': float(price_data.get('Amount', {}).get('#text', 0)),                'currency': price_data.get('CurrencyCode', {}).get('#text', '')            }

3. 亚马逊商品数据管理工具

python

运行

   import os    import json    import sqlite3    from datetime import datetime, timedelta    import time    from concurrent.futures import ThreadPoolExecutor    import pandas as pd    class AmazonProductManager:        """亚马逊商品数据管理工具"""        def __init__(self, access_key, secret_key, seller_id, default_marketplace='US',                      cache_dir="./amazon_cache"):            self.client = AmazonProductClient(access_key, secret_key, seller_id, default_marketplace)            self.cache_dir = cache_dir            self.db_path = os.path.join(cache_dir, "amazon_products.db")            self._init_cache()            # 支持的市场            self.supported_markets = list(AmazonProductClient.MARKETPLACES.keys())        def _init_cache(self):            """初始化缓存数据库"""            if not os.path.exists(self.cache_dir):                os.makedirs(self.cache_dir)            # 连接数据库            conn = sqlite3.connect(self.db_path)            cursor = conn.cursor()            # 创建商品缓存表            cursor.execute('''            CREATE TABLE IF NOT EXISTS product_cache (                asin TEXT,                marketplace TEXT,                data TEXT,                fetch_time TEXT,                PRIMARY KEY (asin, marketplace)            )            ''')            # 创建价格缓存表            cursor.execute('''            CREATE TABLE IF NOT EXISTS price_cache (                asin TEXT,                marketplace TEXT,                data TEXT,                fetch_time TEXT,                PRIMARY KEY (asin, marketplace, fetch_time)            )            ''')            # 创建类目缓存表            cursor.execute('''            CREATE TABLE IF NOT EXISTS category_cache (                category_id TEXT PRIMARY KEY,                name TEXT,                marketplace TEXT,                data TEXT,                fetch_time TEXT            )            ''')            conn.commit()            conn.close()        def get_product_detail(self, asin, marketplace=None, use_cache=True, cache_ttl=3600):            """            获取商品详情            :param asin: 商品ASIN            :param marketplace: 市场代码,如US, UK            :param use_cache: 是否使用缓存            :param cache_ttl: 缓存有效期(秒)            :return: 商品详情字典            """            marketplace = marketplace or self.client.marketplace            # 尝试从缓存获取            if use_cache:                cached_data = self._get_cached_data(                    "product_cache",                     {"asin": asin, "marketplace": marketplace},                     cache_ttl                )                if cached_data:                    print(f"使用缓存数据,ASIN: {asin}, 市场: {marketplace}")                    return cached_data            # 切换市场(如果需要)            original_marketplace = self.client.marketplace            if marketplace != original_marketplace:                self.client.set_marketplace(marketplace)            # 从接口获取            print(f"调用接口获取商品详情,ASIN: {asin}, 市场: {marketplace}")            result = self.client.get_product_by_asin(asin)            # 恢复原始市场            if marketplace != original_marketplace:                self.client.set_marketplace(original_marketplace)            # 更新缓存            if result:                self._update_product_cache(asin, marketplace, result)            return result        def get_product_pricing(self, asin, marketplace=None, use_cache=True, cache_ttl=300):            """获取商品价格信息"""            marketplace = marketplace or self.client.marketplace            # 尝试从缓存获取            if use_cache:                cached_data = self._get_cached_data(                    "price_cache",                     {"asin": asin, "marketplace": marketplace},                     cache_ttl,                    latest_only=True                )                if cached_data:                    print(f"使用缓存价格数据,ASIN: {asin}, 市场: {marketplace}")                    return cached_data            # 切换市场(如果需要)            original_marketplace = self.client.marketplace            if marketplace != original_marketplace:                self.client.set_marketplace(marketplace)            # 从接口获取            print(f"调用接口获取商品价格,ASIN: {asin}, 市场: {marketplace}")            result = self.client.get_competitive_pricing(asin)            # 恢复原始市场            if marketplace != original_marketplace:                self.client.set_marketplace(original_marketplace)            # 更新缓存            if result:                self._update_price_cache(asin, marketplace, result)            return result        def get_variations(self, parent_asin, marketplace=None, use_cache=True):            """获取变体商品列表"""            marketplace = marketplace or self.client.marketplace            # 获取父商品信息            parent_product = self.get_product_detail(parent_asin, marketplace, use_cache)            if not parent_product or 'variations' not in parent_product['relationships']:                return []            variation_asins = parent_product['relationships']['variations']            if not variation_asins:                return []            # 批量获取变体商品信息            variations = []            for asin in variation_asins:                variation = self.get_product_detail(asin, marketplace, use_cache)                if variation:                    # 获取变体价格                    pricing = self.get_product_pricing(asin, marketplace, use_cache)                    if pricing:                        variation['pricing'] = pricing                    variations.append(variation)            return {                'parent_asin': parent_asin,                'marketplace': marketplace,                'variations': variations,                'count': len(variations)            }        def batch_get_products(self, asins, marketplace=None, max_workers=2, **kwargs):            """批量获取商品详情"""            if not asins:                return []            marketplace = marketplace or self.client.marketplace            # 使用线程池并行获取            with ThreadPoolExecutor(max_workers=max_workers) as executor:                futures = [                    executor.submit(                        self.get_product_detail,                         asin,                         marketplace=marketplace,** kwargs                    ) for asin in asins                ]                # 收集结果                results = []                for future in futures:                    try:                        product = future.result()                        if product:                            results.append(product)                    except Exception as e:                        print(f"获取商品详情异常: {str(e)}")            return results        def compare_products_across_markets(self, asin, markets=None):            """跨市场比较商品信息"""            markets = markets or ['US', 'UK', 'DE', 'JP']            markets = [m for m in markets if m in self.supported_markets]            if not markets:                return None            # 获取各市场的商品信息            market_data = {}            for market in markets:                product = self.get_product_detail(asin, market)                if product:                    pricing = self.get_product_pricing(asin, market)                    market_data[market] = {                        'product': product,                        'pricing': pricing                    }            return {                'asin': asin,                'markets': market_data,                'comparison_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')            }        def analyze_price_trend(self, asin, marketplace=None, days=7):            """分析商品价格趋势"""            marketplace = marketplace or self.client.marketplace            # 从缓存获取历史价格数据            conn = sqlite3.connect(self.db_path)            query = '''            SELECT data, fetch_time FROM price_cache             WHERE asin = ? AND marketplace = ?             AND fetch_time >= ?            ORDER BY fetch_time ASC            '''            # 计算起始日期            start_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S')            cursor = conn.execute(query, (asin, marketplace, start_date))            records = cursor.fetchall()            conn.close()            if not records:                return None            # 解析历史数据            trend_data = []            for data_str, fetch_time in records:                try:                    data = json.loads(data_str)                    if 'lowest_prices' in data and data['lowest_prices']:                        # 取第一个最低价格                        lowest_price = data['lowest_prices'][0]['price']                        trend_data.append({                            'date': fetch_time,                            'amount': lowest_price['amount'],                            'currency': lowest_price['currency']                        })                except Exception as e:                    print(f"解析历史价格数据失败: {str(e)}")            if not trend_data:                return None            # 转换为DataFrame进行分析            df = pd.DataFrame(trend_data)            df['date'] = pd.to_datetime(df['date'])            # 计算统计数据            stats = {                'start_date': df['date'].min().strftime('%Y-%m-%d'),                'end_date': df['date'].max().strftime('%Y-%m-%d'),                'record_count': len(df),                'min_price': df['amount'].min(),                'max_price': df['amount'].max(),                'avg_price': df['amount'].mean(),                'current_price': df.iloc[-1]['amount'],                'currency': trend_data[0]['currency'],                'trend': 'up' if df.iloc[-1]['amount'] > df.iloc[0]['amount'] else                          'down' if df.iloc[-1]['amount'] < df.iloc[0]['amount'] else 'stable'            }            return {                'asin': asin,                'marketplace': marketplace,                'trend_data': trend_data,                'statistics': stats,                'analysis_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')            }        def _get_cached_data(self, table, filters, ttl, latest_only=False):            """从缓存获取数据"""            conn = sqlite3.connect(self.db_path)            cursor = conn.cursor()            # 构建查询条件            where_clause = " AND ".join([f"{k} = ?" for k in filters.keys()])            params = list(filters.values())            query = f"SELECT data, fetch_time FROM {table} WHERE {where_clause}"            # 按时间排序            if latest_only:                query += " ORDER BY fetch_time DESC LIMIT 1"            cursor.execute(query, params)            records = cursor.fetchall()            conn.close()            if not records:                return None            # 检查缓存是否过期            expire_time = (datetime.now() - timedelta(seconds=ttl)).strftime('%Y-%m-%d %H:%M:%S')            for data_str, fetch_time in records:                if fetch_time >= expire_time:                    try:                        return json.loads(data_str)                    except:                        continue            return None        def _update_product_cache(self, asin, marketplace, data):            """更新商品缓存"""            if not data or 'identifiers' not in data or data['identifiers'].get('asin') != asin:                return            conn = sqlite3.connect(self.db_path)            cursor = conn.cursor()            data_str = json.dumps(data, ensure_ascii=False)            fetch_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')            cursor.execute('''            INSERT OR REPLACE INTO product_cache             (asin, marketplace, data, fetch_time)            VALUES (?, ?, ?, ?)            ''', (asin, marketplace, data_str, fetch_time))            conn.commit()            conn.close()        def _update_price_cache(self, asin, marketplace, data):            """更新价格缓存"""            if not data or data.get('asin') != asin:                return            conn = sqlite3.connect(self.db_path)            cursor = conn.cursor()            data_str = json.dumps(data, ensure_ascii=False)            fetch_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')            cursor.execute('''            INSERT INTO price_cache             (asin, marketplace, data, fetch_time)            VALUES (?, ?, ?, ?)            ''', (asin, marketplace, data_str, fetch_time))            conn.commit()            conn.close()        def clean_expired_cache(self, product_ttl=86400, price_ttl=604800):            """            清理过期缓存            :param product_ttl: 商品缓存有效期(秒),默认1天            :param price_ttl: 价格缓存有效期(秒),默认7天            """            conn = sqlite3.connect(self.db_path)            cursor = conn.cursor()            # 计算过期时间            product_expire = (datetime.now() - timedelta(seconds=product_ttl)).strftime('%Y-%m-%d %H:%M:%S')            price_expire = (datetime.now() - timedelta(seconds=price_ttl)).strftime('%Y-%m-%d %H:%M:%S')            # 清理商品缓存            cursor.execute("DELETE FROM product_cache WHERE fetch_time < ?", (product_expire,))            deleted_products = cursor.rowcount            # 清理价格缓存            cursor.execute("DELETE FROM price_cache WHERE fetch_time < ?", (price_expire,))            deleted_prices = cursor.rowcount            conn.commit()            conn.close()            print(f"清理过期缓存完成,删除商品缓存 {deleted_products} 条,价格缓存 {deleted_prices} 条")            return {                "deleted_products": deleted_products,                "deleted_prices": deleted_prices            }

四、完整使用示例 1. 商品详情获取与多市场对比

python

运行

   def amazon_product_demo():        # 替换为实际的MWS凭证        ACCESS_KEY = "your_access_key"        SECRET_KEY = "your_secret_key"        SELLER_ID = "your_seller_id"        # 初始化商品管理器        product_manager = AmazonProductManager(            ACCESS_KEY,             SECRET_KEY,             SELLER_ID,            default_marketplace='US'        )        # 示例ASIN - 替换为实际的商品ASIN        ASIN = "B07XYZ1234"        try:            # 1. 获取美国站商品详情            print("===== 获取美国站商品详情 =====")            us_product = product_manager.get_product_detail(ASIN, marketplace='US')            if us_product:                print(f"ASIN: {us_product['identifiers']['asin']}")                print(f"标题: {us_product['attributes']['title']}")                print(f"品牌: {us_product['attributes']['brand']}")                print(f"类别: {us_product['attributes']['category']}")                print(f"价格: {us_product['attributes']['price']['amount']} {us_product['attributes']['price']['currency']}")                print(f"销售排名: {us_product['sales_rank'][0]['rank']} ({us_product['sales_rank'][0]['category_id']})")                # 检查是否为变体商品                if 'is_variation' in us_product['relationships'] and us_product['relationships']['is_variation']:                    print(f"该商品是变体,父ASIN: {us_product['relationships']['parent_asin']}")                # 检查是否有子变体                if 'is_parent' in us_product['relationships'] and us_product['relationships']['is_parent']:                    print(f"该商品有 {len(us_product['relationships']['variations'])} 个子变体")            # 2. 获取商品价格信息            print("\n===== 获取商品价格信息 =====")            pricing = product_manager.get_product_pricing(ASIN, marketplace='US')            if pricing and 'lowest_prices' in pricing:                print(f"最低价格: {pricing['lowest_prices'][0]['price']['amount']} {pricing['lowest_prices'][0]['price']['currency']}")                print(f"最低价格条件: {pricing['lowest_prices'][0]['condition']}")                print(f"提供该价格的卖家数: {pricing['lowest_prices'][0]['offer_count']}")            # 3. 跨市场比较            print("\n===== 跨市场商品比较 =====")            market_comparison = product_manager.compare_products_across_markets(                ASIN,                 markets=['US', 'UK', 'DE']            )            if market_comparison:                for market, data in market_comparison['markets'].items():                    if 'product' in data and 'pricing' in data and data['pricing']['lowest_prices']:                        print(f"\n{market}市场:")                        print(f"标题: {data['product']['attributes']['title']}")                        print(f"最低价格: {data['pricing']['lowest_prices'][0]['price']['amount']} {data['pricing']['lowest_prices'][0]['price']['currency']}")                        if data['product']['sales_rank']:                            print(f"销售排名: {data['product']['sales_rank'][0]['rank']}")            # 4. 分析价格趋势            print("\n===== 价格趋势分析 =====")            price_trend = product_manager.analyze_price_trend(ASIN, marketplace='US', days=7)            if price_trend:                stats = price_trend['statistics']                print(f"分析周期: {stats['start_date']} 至 {stats['end_date']}")                print(f"价格范围: {stats['min_price']} - {stats['max_price']} {stats['currency']}")                print(f"平均价格: {stats['avg_price']:.2f} {stats['currency']}")                print(f"当前价格: {stats['current_price']} {stats['currency']}")                print(f"价格趋势: {stats['trend']}")            # 5. 如果是父商品,获取变体信息            if us_product and 'is_parent' in us_product['relationships'] and us_product['relationships']['is_parent']:                print("\n===== 获取变体商品信息 =====")                variations = product_manager.get_variations(ASIN, marketplace='US')                if variations and variations['variations']:                    print(f"共找到 {variations['count']} 个变体商品:")                    for i, var in enumerate(variations['variations'], 1):                        var_price = var.get('pricing', {}).get('lowest_prices', [{}])[0].get('price', {})                        print(f"\n变体 {i}:")                        print(f"ASIN: {var['identifiers']['asin']}")                        print(f"属性: {var['attributes']['color']} / {var['attributes']['size']}")                        print(f"价格: {var_price.get('amount', 'N/A')} {var_price.get('currency', '')}")            # 6. 清理过期缓存            print("\n===== 清理过期缓存 =====")            product_manager.clean_expired_cache()        except Exception as e:            print(f"操作失败: {str(e)}")    if __name__ == "__main__":        amazon_product_demo()

2. 批量商品分析与选品辅助

python

运行

   def batch_product_analysis_demo():        # 替换为实际的MWS凭证        ACCESS_KEY = "your_access_key"        SECRET_KEY = "your_secret_key"        SELLER_ID = "your_seller_id"        # 初始化商品管理器        product_manager = AmazonProductManager(            ACCESS_KEY,             SECRET_KEY,             SELLER_ID,            default_marketplace='US'        )        # 要分析的ASIN列表        ASIN_LIST = [            "B07XYZ1234",            "B08ABC5678",            "B09DEF9012",            "B10GHI3456",            "B11JKL7890"        ]        try:            # 批量获取商品信息            print("===== 批量获取商品信息 =====")            products = product_manager.batch_get_products(                ASIN_LIST,                 marketplace='US',                max_workers=2            )            print(f"成功获取 {len(products)}/{len(ASIN_LIST)} 个商品信息")            # 分析每个商品并生成报告            analysis_report = []            for product in products:                # 获取价格信息                pricing = product_manager.get_product_pricing(                    product['identifiers']['asin'],                     marketplace='US'                )                # 提取关键指标                lowest_price = None                offer_count = 0                if pricing and 'lowest_prices' in pricing and pricing['lowest_prices']:                    lowest_price = pricing['lowest_prices'][0]['price']['amount']                    offer_count = pricing['lowest_prices'][0]['offer_count']                # 销售排名                sales_rank = None                if product['sales_rank']:                    sales_rank = product['sales_rank'][0]['rank']                # 添加到报告                analysis_report.append({                    'asin': product['identifiers']['asin'],                    'title': product['attributes']['title'],                    'brand': product['attributes']['brand'],                    'category': product['attributes']['category'],                    'price': product['attributes']['price']['amount'],                    'lowest_price': lowest_price,                    'offer_count': offer_count,                    'sales_rank': sales_rank,                    'is_parent': 'is_parent' in product['relationships'] and product['relationships']['is_parent']                })            # 输出分析结果(按销售排名排序)            print("\n===== 商品分析报告 =====")            # 按销售排名升序排序(排名越小越好)            analysis_report.sort(key=lambda x: x['sales_rank'] or float('inf'))            for i, item in enumerate(analysis_report, 1):                print(f"\n{i}. ASIN: {item['asin']}")                print(f"   标题: {item['title'][:50]}...")                print(f"   品牌: {item['brand']}")                print(f"   类别: {item['category']}")                print(f"   价格: {item['price']} USD")                print(f"   最低售价: {item['lowest_price']} USD")                print(f"   卖家数量: {item['offer_count']}")                print(f"   销售排名: {item['sales_rank'] or 'N/A'}")                print(f"   是否为变体父商品: {'是' if item['is_parent'] else '否'}")            # 找出潜在的优质商品(销售排名前20万,卖家数量较少)            potential_products = [                p for p in analysis_report                 if p['sales_rank'] and p['sales_rank'] < 200000                 and p['offer_count'] and p['offer_count'] < 10            ]            if potential_products:                print("\n===== 潜在优质商品 =====")                for p in potential_products:                    print(f"- {p['asin']}: {p['title'][:50]}... (排名: {p['sales_rank']}, 卖家数: {p['offer_count']})")        except Exception as e:            print(f"批量分析失败: {str(e)}")    if __name__ == "__main__":        batch_product_analysis_demo()

五、常见问题与优化建议 1. 常见错误码及解决方案 错误码 说明 解决方案 503 服务暂时不可用 实现指数退避重试机制,等待一段时间后重试 400 无效请求参数 检查 ASIN 格式、MarketplaceId 是否正确 401 认证失败 检查 Access Key、Secret Key 和签名是否正确 403 权限不足 确保已申请相应 API 的访问权限 405 方法不允许 确认使用正确的 HTTP 方法(GET/POST) 429 请求过于频繁 降低请求频率,确保不超过 QPS 限制 500 服务器内部错误 记录错误信息,稍后重试 2. 接口调用优化策略

   多站点处理:根据产品销售区域动态切换 Marketplace,减少跨区域调用    缓存分层:商品基本信息缓存 12-24 小时,价格数据缓存 5-15 分钟    批量处理:合理设置线程池大小,控制并发请求数量    增量更新:定期只更新有变化的商品数据,减少不必要的调用    异常处理:实现自动重试机制,对 503、429 等错误进行特殊处理

3. 数据处理特色技巧

   变体商品处理:区分父商品与子变体,构建完整的商品变体关系树    价格策略分析:结合历史价格数据,分析价格波动规律与促销周期    多市场对比:统一货币单位后进行跨市场价格与销量对比    销售排名解读:结合类目信息正确解读销售排名的实际意义    竞品识别:通过标题、品牌和类目信息识别核心竞争对手

若在 MWS API 对接中遇到 “签名排查”“变体解析”“站点切换” 等具体问题,可在评论区说明场景(如 “US 站 ASIN 解析不到变体”),将针对性分享解决方案 —— 跨境电商的技术落地,少踩一个坑就是多一份效率。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档