

2024 年初,我开发了一个道路巡检 App,功能简单:车载终端启动后,调用本地摄像头识别路面病害,并上传结果到云端。一切逻辑都运行在单一设备上——典型的 单体式应用架构。
问题很快暴露:
团队提出新方案:引入无人机作为移动感知节点,由车机发起任务,无人机执行巡检,结果回传融合分析。这要求系统具备 跨设备任务分发、状态同步、容错恢复 能力——远超传统 App 范畴。

于是,我开始构建一个 轻量级分布式任务调度引擎,完全基于 HarmonyOS 原生能力实现。
我们采用 “调度层 - 执行层 - 数据层” 三层架构:
[ 车载终端 ] ——(DSoftBus)——> [ 无人机 ]
| |
v v
[ 任务调度器 ] <——(DDM)——> [ 分布式事件存储 ]首先,定义统一的任务结构,确保跨设备可解析:
// TaskModel.ets
export interface RoadInspectionTask {
taskId: string;
type: 'aerial_scan' | 'ground_check';
targetArea: {
centerLat: number;
centerLng: number;
radius: number; // 米
};
priority: 'high' | 'normal';
createdAt: number;
status: 'pending' | 'running' | 'completed' | 'failed';
executorDeviceId?: string; // 执行设备 networkId
}为支持跨进程/跨设备传输,我们使用 JSON 序列化 + 校验:
// TaskSerializer.ets
export class TaskSerializer {
static serialize(task: RoadInspectionTask): string {
return JSON.stringify(task);
}
static deserialize(jsonStr: string): RoadInspectionTask | null {
try {
const obj = JSON.parse(jsonStr);
if (!obj.taskId || !obj.type) return null;
return obj as RoadInspectionTask;
} catch {
return null;
}
}
}车机通过 deviceManager 发现可用无人机,并分发任务:
// TaskDispatcher.ets
import deviceManager from '@ohos.distributedHardware.deviceManager';
import rpc from '@ohos.rpc';
class TaskDispatcher {
private droneNetworkId: string | null = null;
async discoverAndBindDrone(): Promise<boolean> {
const devices = await deviceManager.getAvailableDeviceList();
const drone = devices.find(d => d.deviceName?.includes('Drone'));
if (drone) {
this.droneNetworkId = drone.networkId;
return true;
}
return false;
}
async dispatchTask(task: RoadInspectionTask): Promise<boolean> {
if (!this.droneNetworkId) {
console.error('No drone bound');
return false;
}
try {
// 连接远程服务
const remoteObj = await rpc.connectRemoteObject(
this.droneNetworkId,
'com.example.drone.InspectionService'
);
// 调用远程方法
const result = await remoteObj.sendRequest(
1001, // 自定义 method ID
TaskSerializer.serialize(task)
);
return result.resultCode === 0;
} catch (err) {
console.error('Dispatch failed:', err);
return false;
}
}
}在无人机端,我们实现一个 InspectionService,继承自 rpc.RemoteObject:
// InspectionService.ets (运行在无人机)
import rpc from '@ohos.rpc';
import taskRunner from '@ohos.taskRunner';
class InspectionService extends rpc.RemoteObject {
constructor() {
super('InspectionService');
}
onRemoteRequest(code: number, data: rpc.MessageSequence): boolean {
if (code === 1001) {
const taskJson = data.readString();
const task = TaskSerializer.deserialize(taskJson);
if (task) {
// 异步执行任务(避免阻塞主线程)
taskRunner.execute(() => {
this.executeInspection(task);
});
// 回写成功响应
const reply = new rpc.MessageSequence();
reply.writeInt(0); // resultCode
data.reply(reply);
return true;
}
}
return false;
}
private async executeInspection(task: RoadInspectionTask) {
// 1. 控制云台对准目标区域
await GimbalController.aimAt(task.targetArea);
// 2. 启动摄像头 + AI 推理
const frames = Camera.captureFrames(5);
const risks = await AIModel.detectRisks(frames);
// 3. 更新任务状态并持久化
task.status = 'completed';
task.result = risks;
// 4. 写入分布式存储(自动同步到车机)
await DistributedStore.put(task.taskId, task);
}
}我们在两端初始化同一个分布式 KV Store:
// DistributedStore.ets(车机 & 无人机共用)
import distributedData from '@ohos.data.distributedData';
const kvManager = distributedData.createKVManager({
bundleName: 'com.example.roadguardian',
storeName: 'inspection_tasks'
});
export class DistributedStore {
static async put(key: string, value: any): Promise<void> {
await kvManager.put(key, JSON.stringify(value));
}
static async get<T>(key: string): Promise<T | null> {
const val = await kvManager.get(key);
return val ? JSON.parse(val) : null;
}
static on(event: 'dataChange', callback: (key: string) => void) {
kvManager.on(event, callback);
}
}车机监听状态变更,实时更新 UI:
// 车载端监听任务完成
DistributedStore.on('dataChange', async (taskId) => {
const task = await DistributedStore.get<RoadInspectionTask>(taskId);
if (task?.status === 'completed') {
this.showRiskAlert(task.result);
this.updateTaskListUI();
}
});实际部署中,我们遇到诸多挑战:
为此,我们加入:
// 任务超时检测
setInterval(async () => {
const pendingTasks = await TaskLocalCache.getPendingTasks();
for (const task of pendingTasks) {
if (Date.now() - task.createdAt > 60_000) {
task.retryCount++;
if (task.retryCount < 3) {
dispatcher.dispatchTask(task); // 重试
} else {
task.status = 'failed';
DistributedStore.put(task.taskId, task);
}
}
}
}, 10_000);过去,我写的是“功能”;现在,我构建的是“系统”。HarmonyOS 的分布式能力,让我第一次在应用层就触及 任务调度、状态一致性、跨设备通信 等系统级问题。
这套引擎目前已在测试路段稳定运行,支持:
更重要的是,它证明了:在鸿蒙生态下,普通开发者也能构建接近操作系统级别的协同体验。
未来,我们将引入 任务优先级抢占、多无人机路径规划、与 V2X 融合 等能力——而这一切,都始于一次从“单端”到“分布式”的勇敢跨越。
真正的智能,不在单个设备,而在设备之间的默契。