我使用Django视图,就像一个没有gui的流引擎。有人能以编程的方式发布创建流程和流程管理的示例吗?我不明白如何在没有前端的情况下完全从django代码(例如来自测试类)来管理流。我需要先创建一个流实例吗?我如何知道我必须执行哪项任务,以及如何执行?我只需要使用没有gui的视频流
提前感谢!
MyApp/models.py
class MedicalParameters(models.Model):
# medical parameters
pas = models.IntegerField(verbose_name="Pressione Sistolica")
pad = models.IntegerField(verbose_name="Pressione Diastolica")
fc = models.IntegerField(verbose_name="Frequenza Cardiaca")
class Triage(models.Model):
date = models.DateTimeField(auto_now=True)
patient_id = models.CharField(max_length=20)
first_name = models.CharField(max_length=150)
last_name = models.CharField(max_length=150)
birth_date = models.DateField(auto_now=False)
sex = models.CharField(max_length=1, choices=SEX, default='M')
# Medical Parameters
parameters = models.ForeignKey(MedicalParameters, blank=True, null=True,
on_delete=models.PROTECT)
class TriageProcess(Process):
triage = models.ForeignKey(Triage, blank=True, null=True, on_delete=models.CASCADE)
class Meta:
verbose_name_plural = 'Triage process'MyApp/flow.py.
class TriageFlow(Flow):
process_class = TriageProcess
process_title = 'Processo di Triage'
process_description = 'Processo di Triage'
summary_template = """
Triage di {{ process.triage.first_name }} {{ process.triage.last_name }}
"""
start = (
flow.Start(
views.StartView,
task_title="Nuovo Triage",
task_description="Inizia Triege"
).Permission(
auto_create=True
).Next(this.register_measures)
)
register_measures = (
flow.View(
views.MeasuresView,
# fields=["parameters"],
task_description="Acquisisci Misure",
task_title='Misure da Multiparametrico'
)
.Assign(lambda act: act.process.created_by)
.Next(this.choose_capitolo)
)MyApp/view.py
class StartView(StartFlowMixin, generic.UpdateView):
form_class = TriageForm
layout = Layout(
Row('patient_id'),
Fieldset('Patient Details',
Row('first_name', 'last_name', 'birth_date'),
Row('sex',
# 'age'
)
)
)
def get_object(self):
return self.activation.process.triage
def activation_done(self, form):
triage = form.save()
self.activation.process.triage = triage
self.activation.process.triage.color = COLOR_VALUE.BIANCO
super(StartView, self).activation_done(form)
# super(StartView, self).activation_done(form)
class MeasuresView(FlowMixin, generic.UpdateView):
form_class = MedicalParametersForm
layout = Layout(
Fieldset('Temperatura ( C )',
Row('temp')),
Fieldset('Pressione',
Row('pas'),
Row('pad')),
Fieldset('Frequenza',
Row('fc'),
Row('fr'),
Row('fio2')),
Fieldset("Analisi Cliniche",
Row('so2'),
Row('ph')),
Fieldset('Emogas',
Row('pao2'),
Row('paco2'),
Row('hco3')),
Fieldset("Indici",
Row('gcs')
# Row('shock')
))
def get_object(self):
return self.activation.process.triage.parameters
def activation_done(self, form):
_measures = form.save()
self.activation.process.triage.parameters = _measures
if not self.activation.process.triage.parameters.fc is None \
and not self.activation.process.triage.parameters.pas is None:
self.activation.process.triage.parameters.shock = self.activation.process.triage.parameters.fc / self.activation.process.triage.parameters.pas
self.activation.process.triage.parameters.save()
color = _measures.calculate_color()
self.activation.process.triage.color = color
self.activation.process.triage.rivalutazione = None
self.activation.process.triage.save()
super(MeasuresView, self).activation_done(form)发布于 2020-03-10 07:29:41
测试视图作为流的一部分,限制了您在测试中所能做的事情。例如,在特定视图的相同流中添加模板和模板变量的测试变得非常麻烦。
如果你要做彻底的测试。你的测试会爆炸到一个令人不快的程度。
要绕过每个视图都要求完成之前的任务这一事实。您可以使用工厂男孩创建与视图关联的特定流任务。并使用post生成钩子运行必要的激活,这意味着您可以像测试中的其他普通django视图一样调用视图。
flows.py
from viewflow import flow
from viewflow.base import Flow, this
from .views import SampleCreateView, SampleUpdateViewOne, SampleUpdateViewTwo
class SampleFlow(Flow):
start = flow.Start(SampleCreateView).Next(this.update_one)
update_one = flow.View(SampleUpdateViewOne).Next(this.update_two)
update_two = flow.View(SampleUpdateViewTwo).Next(this.end)
end = flow.End()测试/factories.py
class TaskFactory(factory.django.DjangoModelFactory):
class Meta:
model = Task
process = factory.SubFactory(SampleProcessFactory)
flow_task = SampleFlow.start
owner = factory.SubFactory(UserFactory)
token = 'START'
@factory.post_generation
def run_activations(self, create, extracted, **kwargs):
activation = self.activate()
if hasattr(activation, 'assign'):
activation.assign()测试/测试视图
class TestSampleFlowUpdateViewTwo(TestCase):
def setUp(self):
self.process = SampleProcessFactory()
self.task_owner = UserFactory()
self.task = TaskFactory(process=self.process,
flow_task=SampleFlow.update_two, owner=self.task_owner)
self.url = reverse('unittest_viewflow:sampleflow:update_two',
kwargs={'process_pk': self.process.pk, 'task_pk': self.task.pk})
def test_get(self):
self.client.force_login(self.task_owner)
response = self.client.get(self.url)
self.assertTrue(response.status_code, 200)
def test_post(self):
self.client.force_login(self.task_owner)
data = {'_viewflow_activation-started': '1970-01-01', 'update_two': 'Update Two'}
response = self.client.post(self.url, data=data)
self.assertEqual(Task.objects.get(pk=self.task.pk).status, 'DONE')有关更多信息,您可以查看此存储库
发布于 2017-06-22 08:14:58
要测试TestClass中的流,可以像往常一样使用django TestClient。只需重复在浏览器中手动执行的相同步骤。
您可以检查HelloWorld演示测试- https://github.com/viewflow/cookbook/blob/master/helloworld/demo/tests.py的示例。
class Test(TestCase):
def setUp(self):
User.objects.create_superuser('admin', 'admin@example.com', 'password')
self.client.login(username='admin', password='password')
def testApproved(self):
self.client.post(
'/workflow/helloworld/helloworld/start/',
{'text': 'Hello, world',
'_viewflow_activation-started': '2000-01-01'}
)
self.client.post(
'/workflow/helloworld/helloworld/1/approve/2/assign/'
)
self.client.post(
'/workflow/helloworld/helloworld/1/approve/2/',
{'approved': True,
'_viewflow_activation-started': '2000-01-01'}
)
process = Process.objects.get()
self.assertEquals('DONE', process.status)
self.assertEquals(5, process.task_set.count())发布于 2018-05-15 07:20:34
请参阅这个答案,它展示了如何在正常的“手动”起点旁边添加编程起点:
class MyRunFlow(flow.Flow):
process_class = Run
start = flow.Start(ProcessCreate, fields=['schedule']). \
Permission(auto_create=True). \
Next(this.wait_data_collect_start)
start2 = flow.StartFunction(process_create). \
Next(this.wait_data_collect_start)注意,重要的是process_create有Process对象,这段代码必须以编程方式设置与手动表单提交通过字段规范到ProcessCreate所做的相同的字段。
@flow_start_func
def process_create(activation: FuncActivation, **kwargs):
#
# Update the database record.
#
db_sch = Schedule.objects.get(id=kwargs['schedule'])
activation.process.schedule = db_sch # <<<< Same fields as ProcessCreate
activation.process.save()
#
# Go!
#
activation.prepare()
with Context(propagate_exception=False):
activation.done()
return activation请注意,一旦以编程方式启动流,则序列中的任何非手动任务都会自动执行。
在我请在此描述的一系列非手动任务中没有提到一个关于错误处理的重要警告,并且我给出了一个部分的答案(我不知道完整的答案,这就是为什么发布这个问题!);这里是with Context()部分的原因。
对原始线程的第一个回答 by @kmmbvnr还包含关于如何随后以编程方式操作任务的提示。因此,当流程到达手动任务时,您可以分配它,依此类推。
https://stackoverflow.com/questions/44674513
复制相似问题