前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【python项目推荐】键盘监控--统计打字频率

【python项目推荐】键盘监控--统计打字频率

作者头像
一只大鸽子
发布2024-04-26 09:42:48
640
发布2024-04-26 09:42:48
举报

原文:https://greptime.com/blogs/2024-03-19-keyboard-monitoring 代码:https://github.com/GreptimeTeam/demo-scene/tree/main/keyboard-monitor

项目简介

该项目实现了打字频率统计及可视化功能。

项目结果

主要使用的库

pynput:允许您控制和监视输入设备。这里我们用来获取键盘输入。 SQLAlchemy:数据库操作。这里我们用来保存键盘输入。 streamlit:提供可视化界面。

项目组成

代码语言:javascript
复制
agent.py :获得键盘输入
display.py:可视化

补充说明

如果你不想用原文的数据库,也可以替换为本地的数据库,如免安装的sqlite

agent.py

代码语言:javascript
复制
# agent.py
from dotenv import load_dotenv
from pynput import keyboard
from pynput.keyboard import Key

import concurrent.futures
import logging
import os
import queue
import sqlalchemy
import sqlalchemy.exc
import sys
import time


MODIFIERS = {
    Key.shift, Key.shift_l, Key.shift_r,
    Key.alt, Key.alt_l, Key.alt_r, Key.alt_gr,
    Key.ctrl, Key.ctrl_l, Key.ctrl_r,
    Key.cmd, Key.cmd_l, Key.cmd_r,
}

TABLE = sqlalchemy.Table(
    'keyboard_monitor',
    sqlalchemy.MetaData(),
    sqlalchemy.Column('hits', sqlalchemy.String),
    sqlalchemy.Column('ts', sqlalchemy.DateTime),
)


if __name__ == '__main__':
    load_dotenv()

    log = logging.getLogger("agent")
    log.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(funcName)s %(message)s')
    file_handler = logging.FileHandler(f'agent-{time.time_ns()}.log', encoding='utf-8')
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(formatter)
    stdout_handler = logging.StreamHandler(sys.stdout)
    stdout_handler.setLevel(logging.INFO)
    stdout_handler.setFormatter(formatter)
    log.addHandler(file_handler)
    log.addHandler(stdout_handler)

    #engine = sqlalchemy.create_engine(os.environ['DATABASE_URL'], 
    #                                  echo_pool=True, 
    #                                  isolation_level='AUTOCOMMIT')
    engine = sqlalchemy.create_engine("sqlite:///keyboard.db")
    current_modifiers = set()
    pending_hits = queue.Queue()
    cancel_signal = queue.Queue()

    def on_press(key):
        if key in MODIFIERS:
            current_modifiers.add(key)
        else:
            hits = sorted([ str(key) for key in current_modifiers ]) + [ str(key) ]
            hits = '+'.join(hits)
            pending_hits.put(hits)
        log.debug(f'{key} pressed, current_modifiers: {current_modifiers}')

    def on_release(key):
        if key in MODIFIERS:
            try:
                current_modifiers.remove(key)
            except KeyError:
                log.warning(f'Key {key} not in current_modifiers {current_modifiers}')
        log.debug(f'{key} released, current_modifiers: {current_modifiers}')

    #with engine.connect() as connection:
    #    connection.execute(sqlalchemy.sql.text("""
    #        CREATE TABLE IF NOT EXISTS keyboard_monitor (
    #            hits STRING NULL,
    #            ts TIMESTAMP(3) NOT NULL,
    #            TIME INDEX ("ts")
    #        ) ENGINE=mito WITH( regions = 1, ttl = '3months')
    #    """))
    # ...
    

    from sqlalchemy import create_engine, Table, Column, String, TIMESTAMP, MetaData, Index
    metadata = MetaData()
    keyboard_monitor = Table(
        'keyboard_monitor', metadata,
        Column('hits', String, nullable=True),
        Column('ts', TIMESTAMP, nullable=False),
    )

    metadata.create_all(engine)

   


    def sender_thread():
        retries = 0
        while True:
            hits = pending_hits.get()
            log.debug(f'got: {hits}')
            if hits is None:
                log.info("Exiting...")
                break
            with engine.connect() as connection:
                try:
                    log.debug(f'sending: {hits}')
                    connection.execute(TABLE.insert().values(hits=hits, ts=sqlalchemy.func.now()))
                    connection.commit()# ...
                    log.info(f'sent: {hits}')
                    retries = 0
                except sqlalchemy.exc.OperationalError as e:
                    if retries >= 10:
                        log.error(f'Retry exceeds. Operational error: {e}')
                        pending_hits.put(hits)
                        continue

                    if e.connection_invalidated:
                        log.warning(f'Connection invalidated: {e}')
                        pending_hits.put(hits)
                        continue

                    msg = str(e)
                    if "(1815, 'Internal error: 1000')" in msg:
                        # TODO 1815 - should not handle internal error;
                        # see https://github.com/GreptimeTeam/greptimedb/issues/3447
                        log.warning(f'Known operational error: {e}')
                        pending_hits.put(hits)
                        continue
                    elif '2005' in msg and 'Unknown MySQL server host' in msg:
                        log.warning(f'DNS temporary unresolved: {e}')
                        pending_hits.put(hits)
                        continue

                    raise e
                finally:
                    retries += 1

    def listener_thread():
        with keyboard.Listener(on_press=on_press, on_release=on_release) as listener:
            log.info("Listening...")
            cancel_signal.get()
            pending_hits.put(None)
            log.info("Exiting...")

    with concurrent.futures.ThreadPoolExecutor() as executor:
        sender = executor.submit(sender_thread)
        listener = executor.submit(listener_thread)
        try:
            f = concurrent.futures.wait([sender, listener], return_when=concurrent.futures.FIRST_EXCEPTION)
            for fut in f.done:
                log.error(f'Unhandled exception for futures: {fut.exception(timeout=0)}')
        except KeyboardInterrupt as e:
            log.info("KeyboardInterrupt. Exiting...")
        except Exception as e:
            log.error(f'Unhandled exception: {e}')
        finally:
            cancel_signal.put(True)

display.py

代码语言:javascript
复制
# display.py
import datetime
import os
from dotenv import load_dotenv
import pytz
import streamlit as st
import tzlocal
import pandas

st.title("Keyboard Monitor")

load_dotenv()
#conn = st.connection(
##    type="sql",
#    url="sqlite:///keyboard.db",
#)

conn = st.connection('keyboard', type='sql', url="sqlite:///keyboard.db")

df = conn.query("SELECT COUNT(*) AS total_hits FROM keyboard_monitor")
st.metric("Total hits", df.total_hits[0])

most_frequent_key, most_frequent_combo = st.columns(2)
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits NOT LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 1;
""")
most_frequent_key.metric("Most frequent key", df.hits[0])
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 1;
""")
most_frequent_combo.metric("Most frequent combo", df.hits[0])

top_frequent_keys, top_frequent_combos = st.columns(2)
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits NOT LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 10;
""")
top_frequent_keys.subheader("Top 10 keys")
top_frequent_keys.dataframe(df)
df = conn.query("""
SELECT hits, COUNT(*) as times
FROM keyboard_monitor
WHERE hits LIKE '%+%'
GROUP BY hits
ORDER BY times DESC limit 10;
""")
top_frequent_combos.subheader("Top 10 combos")
top_frequent_combos.dataframe(df)

st.header("Find your inputs frequency of day")
local_tz = tzlocal.get_localzone()
hours = int(local_tz.utcoffset(datetime.datetime.now()).total_seconds() / 3600)
if hours > 0:
    offset = f" + INTERVAL '{hours} hours'"
elif hours < 0:
    offset = f" - INTERVAL '{hours} hours'"
else:
    offset = ''
d = st.date_input("Pick a day:", value=datetime.date.today())
query = f"""
SELECT 
    ts,
    COUNT(1) AS times
FROM keyboard_monitor
WHERE strftime('%Y-%m-%d', ts, 'localtime') = '{d}'
GROUP BY strftime('%Y-%m-%d %H:00:00', ts)
ORDER BY ts ASC
LIMIT 10;
"""

df = conn.query(query)
#print(df.keys())
df['ts'] = pandas.to_datetime(df['ts'])
df['ts'] = df['ts'].dt.tz_localize(pytz.utc).dt.tz_convert(local_tz)
st.dataframe(df)
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2024-04-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 一只大鸽子 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 项目简介
    • 主要使用的库
      • 项目组成
      • 补充说明
      相关产品与服务
      数据库
      云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档