首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >National Instrument Long Monitoring Python

National Instrument Long Monitoring Python
EN

Stack Overflow用户
提问于 2021-04-07 18:47:41
回答 1查看 122关注 0票数 1

我正在通过NI开发的nidaqmx模块构建一些脚本例程。我使用2个NI PXI 44-98 (32通道)采集卡。

我们希望开发一个长期(10小时)的监测实验,采样率为200k Hz。

目前,我正在与python的内在限制和Nidaqmx模块的逻辑作斗争。

到目前为止,我已经用有限数量的传感器编写了一个连续采集例程。

代码语言:javascript
复制
import nidaqmx
from nidaqmx.constants import AcquisitionType
import matplotlib.pyplot as plt
import numpy as np

sample_time = 600  # units = seconds
s_freq = 200000
num_samples = sample_time*s_freq
dt = 1/s_freq
print('go acquisition !')
with nidaqmx.Task() as task:
    task.ai_channels.add_ai_accel_chan("PXI1Slot2_3/ai1:3",
                                       sensitivity=10000.0,
                                       max_val=1,
                                       min_val=-1)
    task.timing.cfg_samp_clk_timing(s_freq,
                                   sample_mode = AcquisitionType.CONTINUOUS)
    data = task.read(number_of_samples_per_channel=num_samples, timeout = nidaqmx.constants.WAIT_INFINITELY)
print('I do it right !')

但是使用这个非常简单的例程,我不能记录超过10分钟的监控。python的内存不足以支持它。这对我来说是完全合乎逻辑的。

我在NI网站上检查了缓冲区逻辑,但我不清楚如何在这里实现它……

我不明白如何在这个小例程中适应任务记录的每X MB数据一次写入磁盘,同时仍然监视和清空" data“目录以避免溢出,并且我在stackoverflow上没有看到我的情况下的一些正确答案。

如果你已经遇到这个问题,并且你有解决方案,我很感兴趣。

感谢您的阅读

EN

回答 1

Stack Overflow用户

发布于 2021-09-23 00:12:18

我不知道你是解决了你的问题,还是停止了尝试,无论哪种方式,这都是一个基本的解决方案。我使用的是以太网控制的NI 9234卡,我使用csv存储数据:

代码语言:javascript
复制
# This works for a NI9234 card
import nidaqmx
from nidaqmx.constants import AcquisitionType
from nidaqmx import stream_readers
import numpy as np
import csv
import time
from datetime import datetime

sample_rate = 2048                              # Usually sample rate goes 2048 Hz, 2560 Hz, 3200 Hz, 5120 Hz, 6400 Hz (at least for a NI9234 card) 
samples_to_acq = 2048                           # At least for vibration matters, Samples to acq go from 2048, 4096, 8192
wait_time = samples_to_acq/sample_rate          # Data Acquisition Time in s, very important for the NI task function, and since we have 2048 on both sides, time is 1 s
cont_mode = AcquisitionType.CONTINUOUS              # There is also FINITE for sporadic measurements
iterations = 10
    
with nidaqmx.Task() as task:
    now = datetime.now()
    military = now.strftime('%H:%M:%S')     # Just recording the time like 20:32:54 instead of 8:32:54 pm
    first_header = ['Some title in here']
    second_header = [f'T. Captura: {military}']

# Create accelerometer channel and configure sample clock. current_excit_val=0.002 enables IEPE, velocity measured in ips and accel in m/s2, 
# both prox and tachometer can be used with a simple voltage channel.
# For more info about channels: https://nidaqmx-python.readthedocs.io/en/latest/ai_channel_collection.html

# Two accelerometer channels
task.ai_channels.add_ai_accel_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai0", sensitivity = 100, current_excit_val = 0.002)
task.ai_channels.add_ai_accel_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai1", sensitivity = 100, current_excit_val = 0.002)

# Two voltage channels
task.ai_channels.add_ai_voltage_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai2")
task.ai_channels.add_ai_voltage_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai3")

total_wait_time = wait_time * iterations                         # We will only take 10 measurements, 10 s for this example
samples_to_acq_new = samples_to_acq * iterations                 # Also multiply by 10 to keep the same ratio, it should be 

# Sets source of sample clock, its rate, and number of samples to aquire = buffer size
task.timing.cfg_samp_clk_timing(sample_rate, sample_mode = cont_mode, samps_per_chan = samples_to_acq_new)               
  
start = time.time()
print ('Starting task...')         # Just to keep a control in your console

# Saving the 4 channels to a csv file. This file is overwritten everytime the program is executed.
# It should appear in the same folder that your program is located. 
with open('data_10times.csv', 'w', newline = '') as f:
    writer = csv.writer(f)
    writer.writerow(first_header)
    writer.writerow(second_header)
    
    # adding some blank spaces in btw
    writer.writerow('')
    writer.writerow('')        
    
    x = np.linspace(0, total_wait_time, samples_to_acq_new)       # Your x axis (ms), starts from 0, final time is total_wait_time, equally divided by the number of samples you'll capture
    
    data = np.ndarray((4, samples_to_acq_new), dtype = np.float64)  #Creates an array, 4 columns each one of 20480 rows
    nidaqmx.stream_readers.AnalogMultiChannelReader(task.in_stream).read_many_sample(data, samples_to_acq_new, timeout = 14) # it should't take that long for this example, check out time for other exercises               
    
    for value in range(len(x)):
        writer.writerow([x[value], data[0][value], data[1][value], data[2][value], data[3][value]])

elapsed_time = (time.time() - start)
print (f'done in {elapsed_time}')

所以..。5个月后,但我希望它能有所帮助。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66984522

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档