我正在通过NI开发的nidaqmx模块构建一些脚本例程。我使用2个NI PXI 44-98 (32通道)采集卡。
我们希望开发一个长期(10小时)的监测实验,采样率为200k Hz。
目前,我正在与python的内在限制和Nidaqmx模块的逻辑作斗争。
到目前为止,我已经用有限数量的传感器编写了一个连续采集例程。
import nidaqmx
from nidaqmx.constants import AcquisitionType
import matplotlib.pyplot as plt
import numpy as np
sample_time = 600 # units = seconds
s_freq = 200000
num_samples = sample_time*s_freq
dt = 1/s_freq
print('go acquisition !')
with nidaqmx.Task() as task:
task.ai_channels.add_ai_accel_chan("PXI1Slot2_3/ai1:3",
sensitivity=10000.0,
max_val=1,
min_val=-1)
task.timing.cfg_samp_clk_timing(s_freq,
sample_mode = AcquisitionType.CONTINUOUS)
data = task.read(number_of_samples_per_channel=num_samples, timeout = nidaqmx.constants.WAIT_INFINITELY)
print('I do it right !')但是使用这个非常简单的例程,我不能记录超过10分钟的监控。python的内存不足以支持它。这对我来说是完全合乎逻辑的。
我在NI网站上检查了缓冲区逻辑,但我不清楚如何在这里实现它……
我不明白如何在这个小例程中适应任务记录的每X MB数据一次写入磁盘,同时仍然监视和清空" data“目录以避免溢出,并且我在stackoverflow上没有看到我的情况下的一些正确答案。
如果你已经遇到这个问题,并且你有解决方案,我很感兴趣。
感谢您的阅读
发布于 2021-09-23 00:12:18
我不知道你是解决了你的问题,还是停止了尝试,无论哪种方式,这都是一个基本的解决方案。我使用的是以太网控制的NI 9234卡,我使用csv存储数据:
# This works for a NI9234 card
import nidaqmx
from nidaqmx.constants import AcquisitionType
from nidaqmx import stream_readers
import numpy as np
import csv
import time
from datetime import datetime
sample_rate = 2048 # Usually sample rate goes 2048 Hz, 2560 Hz, 3200 Hz, 5120 Hz, 6400 Hz (at least for a NI9234 card)
samples_to_acq = 2048 # At least for vibration matters, Samples to acq go from 2048, 4096, 8192
wait_time = samples_to_acq/sample_rate # Data Acquisition Time in s, very important for the NI task function, and since we have 2048 on both sides, time is 1 s
cont_mode = AcquisitionType.CONTINUOUS # There is also FINITE for sporadic measurements
iterations = 10
with nidaqmx.Task() as task:
now = datetime.now()
military = now.strftime('%H:%M:%S') # Just recording the time like 20:32:54 instead of 8:32:54 pm
first_header = ['Some title in here']
second_header = [f'T. Captura: {military}']
# Create accelerometer channel and configure sample clock. current_excit_val=0.002 enables IEPE, velocity measured in ips and accel in m/s2,
# both prox and tachometer can be used with a simple voltage channel.
# For more info about channels: https://nidaqmx-python.readthedocs.io/en/latest/ai_channel_collection.html
# Two accelerometer channels
task.ai_channels.add_ai_accel_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai0", sensitivity = 100, current_excit_val = 0.002)
task.ai_channels.add_ai_accel_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai1", sensitivity = 100, current_excit_val = 0.002)
# Two voltage channels
task.ai_channels.add_ai_voltage_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai2")
task.ai_channels.add_ai_voltage_chan(physical_channel = "cDAQ9181-1E3866DMod1/ai3")
total_wait_time = wait_time * iterations # We will only take 10 measurements, 10 s for this example
samples_to_acq_new = samples_to_acq * iterations # Also multiply by 10 to keep the same ratio, it should be
# Sets source of sample clock, its rate, and number of samples to aquire = buffer size
task.timing.cfg_samp_clk_timing(sample_rate, sample_mode = cont_mode, samps_per_chan = samples_to_acq_new)
start = time.time()
print ('Starting task...') # Just to keep a control in your console
# Saving the 4 channels to a csv file. This file is overwritten everytime the program is executed.
# It should appear in the same folder that your program is located.
with open('data_10times.csv', 'w', newline = '') as f:
writer = csv.writer(f)
writer.writerow(first_header)
writer.writerow(second_header)
# adding some blank spaces in btw
writer.writerow('')
writer.writerow('')
x = np.linspace(0, total_wait_time, samples_to_acq_new) # Your x axis (ms), starts from 0, final time is total_wait_time, equally divided by the number of samples you'll capture
data = np.ndarray((4, samples_to_acq_new), dtype = np.float64) #Creates an array, 4 columns each one of 20480 rows
nidaqmx.stream_readers.AnalogMultiChannelReader(task.in_stream).read_many_sample(data, samples_to_acq_new, timeout = 14) # it should't take that long for this example, check out time for other exercises
for value in range(len(x)):
writer.writerow([x[value], data[0][value], data[1][value], data[2][value], data[3][value]])
elapsed_time = (time.time() - start)
print (f'done in {elapsed_time}')所以..。5个月后,但我希望它能有所帮助。
https://stackoverflow.com/questions/66984522
复制相似问题