模拟实现一个ATM + 购物商城程序
1.额度 15000或自定义
4.支持多账户登录
5.支持账户间转账
9.提供管理接口,包括添加账户、用户额度,冻结账户等。。。
10.用户认证用装饰器
3.可以提现,手续费5%
2.实现购物商城,买东西加入 购物车,调用信用卡接口结账
6.记录每月日常消费流水
7.提供还款接口
8.ATM记录操作日志
包含随时返回:b 和随时退出:q
ps:使用面向过程的思想去实现
画的贼烂,框架如上图
ATM+购物车
|-------conf 配置加接口
| |------interface.py 接口配置
| |------settings.py 路径加日志配置
|
|-------core 核心 用户与超管
| |------admin.py 超管功能
| |------atm.py ATM功能
| |------shopping.py 购物功能
|
|-------db 数据库
| |------userdb 用户文件 json格式
| |------dbhandler.py 数据层操作
|
|-------lib 公共库
| |------common.py 用户认证加日志功能
|
|-------log 日志文件夹
| |------operation.log 操作日志
| |------trading.log 交易日志
|-------start.py 程序启动文件
#coding:utf-8
from db import dbhandler
from lib import common
logger1=common.get_logger("操作日志")
logger2=common.get_logger('交易日志')
def add_user(name,pwd='123',lines=15000):
'''添加账户接口'''
user_dict={'name':name,'password':pwd,'lines':lines,'balance':lines,'buy':[],'lock':False}
state,msg=dbhandler.query(name)
if state:
return False,'用户已存在'
else:
dbhandler.save(user_dict)
logger1.info('%s用户创建成功' % name)
return True, '%s用户创建成功,密码为%s' % (name,pwd)
def trans_interface(my_name,trans_name,money):
'''转账接口'''
state_my,msg_my=dbhandler.query(my_name)
state_tra,msg_tra=dbhandler.query(trans_name)
if state_tra:
if int(money) < int(msg_my['balance']):
msg_my['balance']=int(msg_my['balance'])-int(money)
msg_tra['balance']=int(msg_tra['balance'])+int(money)
dbhandler.save(msg_my)
dbhandler.save(msg_tra)
logger2.info('%s转账给%s%s元成功'%(my_name,trans_name,money))
return True,'%s转账给%s%s元成功'%(my_name,trans_name,money)
else:
return False,'转账失败,余额不足'
else:
return False,'转账用户不存在'
def withdrawal_interface(name,money):
'''提现接口'''
state,msg=dbhandler.query(name)
poundage=int(money)*5/100
if msg['balance'] < (int(money)+poundage):
return False,'提现失败,余额不足'
else:
msg['balance']=msg['balance'] - int(money) - poundage
dbhandler.save(msg)
logger2.info('%s提现%s元成功,手续费为%s元'%(name,money,poundage))
return True,'%s提现%s元成功,手续费为%s元'%(name,money,poundage)
def repay_interface(name,money):
'''还款接口'''
state,msg=dbhandler.query(name)
if state:
msg['balance']=msg['balance']+int(money)
dbhandler.save(msg)
logger2.info('%s还款%s元成功,余额为%s'%(name,money,msg['balance']))
return True,'%s还款%s元成功,余额为%s'%(name,money,msg['balance'])
def check_balance(name):
'''查看余额接口'''
state,user_dict=dbhandler.query(name)
res=user_dict['balance']
return res
def shopping_interface(name,cost,shopping_dict):
'''购物支付接口'''
state,user_dict=dbhandler.query(name)
if user_dict['balance'] > cost:
user_dict['balance']-=cost
user_dict['buy'].append(shopping_dict)
dbhandler.save(user_dict)
logger2.info('%s购买成功,消费了%s元'%(name,cost))
return True,'购买成功,消费了%s元'%cost
else:
return False,'余额不足,购买失败'
def check_shop_car(name):
'''查看购物记录即可'''
state,user_dict=dbhandler.query(name)
res=user_dict['buy']
return res
interface.py
# coding:utf-8
# ============================路径配置加日志配置项(日志配置模板)=========================================
import os
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
USER_DB = os.path.join(BASE_DIR, 'db', 'userdb')
standard_format = '[%(asctime)s][task_id:%(name)s]' \
'[%(levelname)s]< %(message)s >' # 其中name为getlogger指定的名字
simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s'
id_simple_format = '[%(asctime)s][%(levelname)s] %(message)s'
# log文件的路径
OPE_LOG_PATH = os.path.join(BASE_DIR, 'log', 'operation.log') # 操作日志路径
TRA_LOG_PATH = os.path.join(BASE_DIR, 'log', 'trading.log') # 交易日志路径
# log配置字典
LOGGING_DIC = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': standard_format
},
'simple': {
'format': id_simple_format
},
},
'filters': {},
'handlers': {
# 打印到终端的日志,使用的格式为 simple_format
'ch': {
'level': 'DEBUG',
'class': 'logging.StreamHandler', # 打印到屏幕
'formatter': 'simple'
},
# 打印到文件的日志,使用的格式为 standard_format
'default1': {
'level': 'DEBUG',
'class': 'logging.FileHandler', # 保存到文件
'formatter': 'standard',
'filename': OPE_LOG_PATH, # 日志文件
'encoding': 'utf-8', # 日志文件的编码,再也不用担心中文log乱码了
},
'default2': {
'level': 'DEBUG',
'class': 'logging.FileHandler', # 保存到文件
'formatter': 'standard',
'filename': TRA_LOG_PATH, # 日志文件
'encoding': 'utf-8', # 日志文件的编码,再也不用担心中文log乱码了
},
},
'loggers': {
'操作日志': {
'handlers': ['ch', 'default1'], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
'level': 'DEBUG',
'propagate': False,
},
'交易日志': {
'handlers': ['ch', 'default2'], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
'level': 'DEBUG',
'propagate': False,
},
},
}
# coding:utf-8
from conf import interface
from db import dbhandler
from lib import common
tag = True
def adduser():
'''添加用户'''
while True:
name = input('请输入账户名:').strip()
if not name: continue
name1 = input('请确认账户名:').strip()
if name != name1:
print('两次输入不一致!')
continue
state, msg = interface.add_user(name1)
if state:
print(msg)
break
else:
print(msg)
def set_lines():
'''设置额度'''
while True:
name = input('请输入账户名:').strip()
if not name: continue
name1 = input('请确认账户名:').strip()
if name != name1:
print('两次输入不一致!')
continue
lines = input('请输入信用额度:').strip()
if lines.isdigit():
state, msg = interface.dbhandler.query(name1)
if state:
msg['lines'] = lines
dbhandler.save(msg)
interface.logger1.info('%s设置%s成功,信用额度为%s!!' % ('admin', name1, lines))
print('%s设置%s成功,信用额度为%s!!' % ('admin', name1, lines))
else:
print(msg)
else:
print('额度请输入数字')
break
def freeze():
'''冻结功能'''
name = input('请输入冻结账户名:').strip()
name1 = input('请确认冻结账户名:').strip()
if name != name1:
print('两次输入不一致!')
return
state, msg = dbhandler.query(name1)
if state:
msg['lock'] = True
dbhandler.save(msg)
interface.logger1.info('%s冻结%s成功' % ('admin', name1))
print('%s冻结%s成功' % (common.user_state['name'], name1))
else:
print(msg)
def unfreeze():
'''解冻功能'''
name = input('请输入冻结账户名:').strip()
name1 = input('请确认冻结账户名:').strip()
if name != name1:
print('两次输入不一致!')
return
state, msg = dbhandler.query(name1)
if state:
msg['lock'] = False
dbhandler.save(msg)
interface.logger1.info('%s解冻%s成功' % ('admin', name1))
print('%s解冻%s成功' % ('admin', name1))
else:
print(msg)
def check_log():
'''查看操作日志功能'''
dbhandler.query_log('admin')
def menu():
view = '''
1.添加账户
2.设置用户额度
3.冻结账户
4.解冻账户
5.查看操作日志
q.退出
'''
print(view)
@common.login_auth(auth='admin')
def main():
view_dict = {
'1': adduser,
'2': set_lines,
'3': freeze,
'4': unfreeze,
'5': check_log
}
global tag
while tag:
print('管理员'.center(60, '-'))
menu()
choice = input('请选择:').strip()
if choice == 'q':
tag = False
break
if not choice or choice not in view_dict: continue
view_dict[choice]()
admin.py
# coding:utf-8
from core import admin
from conf import interface
from db import dbhandler
from lib import common
def registered():
'''注册功能'''
if common.user_state['name']:
print('已登录,无法注册')
return
while True:
print('注册'.center(40, '-'))
name = input('请输入账户名:').strip()
if not name: continue
pwd1 = input('请输入密码:').strip()
if not pwd1: continue
pwd2 = input('请确认密码:').strip()
if pwd1 != pwd2:
print('两次输入密码不一致!')
break
if len(pwd2) < 3:
print('密码强度太低')
continue
state, msg = interface.add_user(name, pwd2)
if state:
print(msg)
break
else:
print(msg)
def check_balance():
'''查看余额'''
balance = interface.check_balance(common.user_state['name'])
print('%s当前余额为%s元!!!' % (common.user_state['name'], balance))
def transfer():
'''转账'''
while True:
print('转账'.center(40, '-'))
name = input('请输入要转账的账户名:').strip()
name1 = input('请确认账户名:').strip()
if name != name1:
print('账户名不一致')
continue
money = input('请输入转账金额:'.strip())
if money.isdigit():
state, msg = interface.trans_interface(common.user_state['name'], name1, money)
if state:
print(msg)
break
else:
print(msg)
break
else:
print('金额请输入数字!')
continue
def repay():
'''还款'''
state, msg = dbhandler.query(common.user_state['name'])
arrears = msg['lines'] - msg['balance']
print('您的欠款为%s元!!!' % (arrears))
repay_wage = input('请输入您要还款的金额: ').strip()
if not repay_wage.isdigit():
print('请输入数字!')
return
else:
state, msg = interface.repay_interface(common.user_state['name'], repay_wage)
print(msg)
def withdrawal():
'''提现'''
while True:
print('提现'.center(40, '-'))
money = input('请输入要提现的金额:').strip()
if not money.isdigit(): continue
message = input('提现将要收取%5手续费,是否继续(y/n):').strip()
if message not in ['y', 'n']: continue
if message == 'y':
state, msg = interface.withdrawal_interface(common.user_state['name'], money)
if state:
print(msg)
break
else:
print(msg)
else:
break
def check_log():
'''查看操作日志'''
dbhandler.query_log(common.user_state['name'])
def view():
view_dict = '''
1.查询余额
2.转账
3.还款
4.提现
5.查询日志
b.返回上一层
q.退出
'''
print(view_dict)
@common.login_auth(auth='user')
def main():
view_dict = {
'1': check_balance,
'2': transfer,
'3': repay,
'4': withdrawal,
'5': check_log
}
while admin.tag:
print('ATM操作界面'.center(60, '-'))
view()
choice = input('请选择:').strip()
if choice == 'q':
admin.tag = False
break
if choice == 'b': break
if choice not in view_dict: continue
view_dict[choice]()
atm.py
# coding:utf-8
from lib import common
from core import admin
from conf import interface
shopping_dict = {} # 购物车
def product():
'''购物功能'''
product_list = [
['华为P20', 5488],
['笔记本', 5599],
['Nike运动鞋', 669],
['十三香小龙虾', 165],
['山地车', 399],
['天梭表', 3699],
['某宝t恤', 69],
['三只松鼠', 59],
['重庆小面', 18],
]
cost = 0 # 初始总花费为0
user_balance = interface.check_balance(common.user_state['name']) # 调接口查询余额
global shopping_dict
while admin.tag:
print('商品列表'.center(50, '-'))
for k, v in enumerate(product_list): # 将商品列表 加上索引打印出来
print(k, '', v)
print('q : 退出/购买 ')
choice = input('\n请选择购买商品:').strip()
if choice.isdigit(): # 判断输入是否为数字
choice = int(choice)
if choice >= len(product_list): continue # 输入的数字大于索引则重新开始
product_name = product_list[choice][0] # 拿到商品名
product_price = product_list[choice][1] # 拿到商品价格
if user_balance >= product_price: # 判断余额是否大于商品价格
if product_name in shopping_dict: # 判断商品是否已在购物车里
shopping_dict[product_name]['数量'] += 1 # 有则购物车的对应商品数量加1
else:
shopping_dict[product_name] = {'价格': product_price, '数量': 1} # 购物车添加新字典
user_balance -= product_price # 余额减去商品价格
cost += product_price # 花费加上商品价格
print('%s加入购物车成功' % [product_name, product_price])
else:
print('余额不足')
elif choice == 'q':
if cost == 0: break
for k, v in shopping_dict.items():
print(k, '', v)
buy = input('是否确认购买(y/n),需支付%s元:' % cost).strip()
if buy == 'y': # 确认支付时调支付接口
state, msg = interface.shopping_interface(common.user_state['name'], cost, shopping_dict)
if state:
print(msg)
break
else:
print(msg)
break
else:
print('无任何商品购买')
shopping_dict = {}
break
else:
print('输入错误')
continue
def shop_car():
'''查看购物车功能'''
for k, v in shopping_dict.items():
print(k, '', v)
def check_shop():
'''查看消费记录功能'''
shop_dic = interface.check_shop_car(common.user_state['name'])
for i in shop_dic:
print(i)
def view():
view_dict = '''
1.商品列表
2.购物车
3.查看消费记录
b.返回上一层
q.退出
'''
print(view_dict)
@common.login_auth(auth='user')
def main():
view_dict = {
'1': product,
'2': shop_car,
'3': check_shop,
}
while admin.tag:
print('购物界面'.center(60, '-'))
view()
choice = input('请选择:').strip()
if choice == 'q':
admin.tag = False
break
if choice == 'b': break
if choice not in view_dict: continue
view_dict[choice]()
shopping.py
#coding:utf-8
import json,os
from conf import settings
def save(user_dic):
'''数据写入'''
name=user_dic['name']
with open(settings.USER_DB+r'\%s.json'%name,'wt') as f:
json.dump(user_dic,f)
f.flush()
def query(name):
'''查询用户信息'''
db_path=settings.USER_DB+r'\%s.json'%name
if os.path.isfile(db_path):
with open(db_path,'rt') as f:
res=json.load(f)
return True,res
else:
return False, '用户不存在'
def query_log(name):
'''查询日志功能'''
with open(settings.OPE_LOG_PATH,'rt',encoding='utf-8') as f:
for names in f.readlines():
if name in names:
print(names)
with open(settings.TRA_LOG_PATH,'rt',encoding='utf-8') as f1:
for names in f1.readlines():
if name in names:
print(names)
dbhandler.py
#:coding:utf-8
user_state = {'name': None}
from conf import settings
import logging.config
from db import dbhandler
def login_user():
'''认证'''
count = 0
while True:
name = input('请输入用户名:').strip()
pwd = input('请输入密码:').strip()
state, msg = dbhandler.query(name)
if count == 3:
print('您已被锁定')
msg['lock'] = True
dbhandler.save(msg)
break
if state: # 查询状态
if pwd == msg['password'] and not msg['lock']:
user_state['name'] = name
print('%s登录成功' % name)
break
elif msg['lock'] == True:
print('您已被锁定')
break
else:
count += 1
print('密码错误%s次' % count)
else:
print(msg)
break
def login_admin():
'''管理员认证'''
admin_dict = {'name': 'admin', 'password': '123', 'lock': False}
name = input('请输入管理员账户:').strip()
pwd = input('请输入密码:').strip()
if name != admin_dict['name'] or pwd != admin_dict['password']:
print('账号或密码错误')
else:
print('登录成功')
user_state['name'] = name
def login_auth(auth='module'):
def outter(func):
'''装饰器'''
def warppers(*args, **kwargs):
if auth == 'user':
if user_state['name'] == None:
login_user()
if user_state['name']:
func(*args, **kwargs)
if auth == 'admin':
if user_state['name'] == None:
login_admin()
if user_state['name'] == 'admin':
func(*args, **kwargs)
else:
print('没有管理员权限!')
return
return warppers
return outter
def get_logger(name):
'''日志模块'''
logging.config.dictConfig(settings.LOGGING_DIC)
logger = logging.getLogger(name)
return logger
common.py
# coding:utf-8
# ================================start.py=======================================
import sys, os
BASE_DIR = os.path.dirname(__file__)
sys.path.append(BASE_DIR)
from core import atm, shopping, admin
def menu():
view = '''
1.注册
2.ATM
3.购物
4.管理员
q.退出
'''
print(view)
if __name__ == '__main__':
view_dict = {
'1': atm.registered,
'2': atm.main,
'3': shopping.main,
'4': admin.main
}
while admin.tag:
print('总界面'.center(60, '-'))
menu()
choice = input('请选择:').strip()
if choice == 'q': break
if not choice or choice not in view_dict: continue
view_dict[choice]()
花了点时间将之前的面向过程编程作业重写了次,完成后感觉还是写的不够好。时间原因就先这样。