前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >利用Oracle dbms_pipe实现存储过程之间的通信

利用Oracle dbms_pipe实现存储过程之间的通信

作者头像
用户1148526
发布2019-05-25 19:38:21
6790
发布2019-05-25 19:38:21
举报
文章被收录于专栏:Hadoop数据仓库

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://cloud.tencent.com/developer/article/1433101

应用程序开发人员的需求是这样的:

  1. 根据条件给每一个国家的商品生成唯一7位随机代码,不同国家之间的商品代码可以相同
  2. 如果输入标准分隔符的字符串,则解析该字符串作为需要生成的商品ID,为其生成代码,否则为商品表中所有商品ID生成代码
  3. 代码的每一位要符合相应的规则,例如第一位的规则是0123,则这位只能是0、1、2、3中的一个数
  4. 由于可能一次生成大量的代码,这个过程需要较长时间,所以需要用进度条提示生成进度
  5. 可以在生成过程执行中终止过程
  6. 返回需要生成的代码个数和实际生成的代码个数
  7. 每次提交的个数可以通过参数定义,例如共要生成10万的代码,每次提交1000个

设计思路:

  1. 为每个国家预生成0到9999999一千万个随机数作为候选代码池表,每生成一个代码就从代码池中删除一个,避免查重操作
  2. 用一个存储过程生成代码,另一个过程用来终止生成过程,两个过程间用dbms_pipe进行通信
  3. 用一个函数返回当前已经生成的代码个数,供显示进度条的外部程序调用,生成代码过程与该函数用dbms_pipe进行通信
  4. 用bulk collect批量处理提高效率

源代码:

c-sharp view plain copy

  1. -- 1. 生成可用代码池,重复执行过程为每一个国家生成一个可用代码池表,表名为org_code_加上两位国家代码
  2. CREATE OR REPLACE PROCEDURE p_org_code
  3. IS
  4. BEGIN
  5. -- tmp是一个提交后删除行的全局临时表
  6. FOR i IN 0 .. 9999999
  7. LOOP
  8. INSERT INTO tmp
  9. VALUES (LPAD (i, 7, '0'));
  10. END LOOP;
  11. INSERT INTO org_code_00
  12. SELECT \*
  13. FROM tmp
  14. ORDER BY DBMS\_RANDOM.VALUE;
  15. COMMIT;
  16. END;
  17. -- 2. 代码生成过程
  18. CREATE OR REPLACE PROCEDURE p_gen_code (
  19. p_country_code IN VARCHAR2, -- 国家代码
  20. p_p1 IN VARCHAR2, -- 第一位规则字符串,如012345
  21. p_p2 IN VARCHAR2, -- 第二位规则字符串,如012345
  22. p_p3 IN VARCHAR2, -- 第三位规则字符串,如012345
  23. p_p4 IN VARCHAR2, -- 第四位规则字符串,如012345
  24. p_p5 IN VARCHAR2, -- 第五位规则字符串,如012345
  25. p_p6 IN VARCHAR2, -- 第六位规则字符串,如012345
  26. p_p7 IN VARCHAR2, -- 第七位规则字符串,如012345
  27. p_instr IN VARCHAR2, -- 商品ID字符串,用,做分隔符
  28. p_count IN NUMBER DEFAULT 1000, -- 每次提交的个数
  29. r_outstr OUT VARCHAR2 -- 输出需要生成的代码个数和实际生成的代码个数
  30. )
  31. IS
  32. l_id DBMS_SQL.varchar2_table;
  33. l_mc DBMS_SQL.varchar2_table;
  34. l_code DBMS_SQL.varchar2_table;
  35. l_mc_str VARCHAR2 (255);
  36. l_code_str VARCHAR2 (7);
  37. l_id_count INT := 0;
  38. l_code_count INT := 0;
  39. l_idx INT;
  40. l_start INT := 1;
  41. l_substr VARCHAR2 (32);
  42. l_instr VARCHAR2 (2000);
  43. l_length INT;
  44. l_tablename VARCHAR2 (30) := 'org_code_' || p_country_code;
  45. l_sql VARCHAR2 (2000);
  46. l_count INT := 0;
  47. l_pointer INT := 0;
  48. l_real_count INT := 0;
  49. -- IPC
  50. l_pipename CONSTANT VARCHAR2 (12) := 'mypipe';
  51. l_pipe_getcount CONSTANT VARCHAR2 (12) := 'getcount';
  52. l_pipe_retcount CONSTANT VARCHAR2 (12) := 'retcount';
  53. l_send_result INT;
  54. BEGIN
  55. DBMS_PIPE.PURGE (l_pipename);
  56. IF p_instr IS NULL
  57. THEN
  58. BEGIN
  59. SELECT ID, mc
  60. BULK COLLECT INTO l\_id, l\_mc
  61. FROM product;
  62. l\_id\_count := l\_id.COUNT;
  63. l\_sql :=
  64. 'select code1 from (select code1 from '
  65. || l\_tablename
  66. || ' where instr('''
  67. || p\_p1
  68. || ''',substr(code1,1,1)) > 0 and instr('''
  69. || p\_p2
  70. || ''',substr(code1,2,1)) > 0 and instr('''
  71. || p\_p3
  72. || ''',substr(code1,3,1)) > 0 and instr('''
  73. || p\_p4
  74. || ''',substr(code1,4,1)) > 0 and instr('''
  75. || p\_p5
  76. || ''',substr(code1,5,1)) > 0 and instr('''
  77. || p\_p6
  78. || ''',substr(code1,6,1)) > 0 and instr('''
  79. || p\_p7
  80. || ''',substr(code1,7,1)) > 0 where rownum <= '
  81. || l\_id\_count;
  82. EXECUTE IMMEDIATE l\_sql
  83. BULK COLLECT INTO l\_code;
  84. l\_code\_count := l\_code.COUNT;
  85. WHILE (l\_count < l\_code\_count)
  86. LOOP
  87. IF DBMS\_PIPE.receive\_message (l\_pipename, 0) = 0
  88. THEN
  89. DBMS\_PIPE.unpack\_message (l\_pipebuf);
  90. EXIT WHEN l\_pipebuf = 'stop';
  91. END IF;
  92. IF DBMS\_PIPE.receive\_message (l\_pipe\_getcount, 0) = 0
  93. THEN
  94. DBMS\_PIPE.PURGE (l\_pipe\_retcount);
  95. dbms\_pipe\_pack\_message (TO\_CHAR (l\_count) || '|running');
  96. l\_send\_result := DBMS\_PIPE.send\_message (l\_pipe\_retcount);
  97. END IF;
  98. l\_real\_count :=
  99. LEAST (p\_count, l\_code\_count - l\_pointer \* p\_count);
  100. FOR i IN 1 .. l\_real\_count
  101. LOOP
  102. INSERT INTO product\_code
  103. VALUES (l\_code (l\_pointer \* p\_count + i),
  104. l\_mc (l\_pointer \* p\_count + i),
  105. p\_country\_code);
  106. EXECUTE IMMEDIATE 'delete from '
  107. || l\_tablename
  108. || ' where code1=:x'
  109. USING l\_code (l\_pointer \* p\_count + i);
  110. UPDATE product
  111. SET status = 1
  112. WHERE ID = l\_id (l\_pointer \* p\_count + i);
  113. END LOOP;
  114. COMMIT;
  115. l\_pointer := pointer + 1;
  116. l\_count := l\_count + l\_real\_count;
  117. END LOOP;
  118. DBMS\_PIPE.PURGE (l\_pipe\_retcount);
  119. DBMS\_PIPE.pack\_message (TO\_CHAR (l\_count) || '|end');
  120. l\_send\_result := DBMS\_PIPE.send\_message (l\_pipe\_retcount);
  121. r\_outstr := l\_id\_count || '|' || l\_count;
  122. DBMS\_OUTPUT.put\_line (r\_outstr);
  123. EXCEPTION
  124. WHEN NO\_DATA\_FOUND
  125. THEN
  126. COMMIT;
  127. WHEN OTHERS
  128. THEN
  129. RAISE;
  130. END;
  131. ELSE
  132. IF SUBSTR (p\_instr, -1, 1) = ','
  133. THEN
  134. l\_instr := p\_instr;
  135. ELSE
  136. l\_instr := p\_instr || ',';
  137. END IF;
  138. l\_length := LENGTH (l\_instr);
  139. <<outer\_loop>>
  140. WHILE l\_start < l\_length
  141. LOOP
  142. l\_id\_count := l\_id\_count + 1;
  143. l\_idx := INSTR (l\_instr, ',', l\_start);
  144. l\_substr := SUBSTR (l\_instr, l\_start, l\_idx - l\_start);
  145. l\_start := l\_idx + 1;
  146. IF DBMS\_PIPE.receive\_message (l\_pipename, 0) = 0
  147. THEN
  148. DBMS\_PIPE.unpack\_message (l\_pipebuf);
  149. EXIT WHEN l\_pipebuf = 'stop';
  150. END IF;
  151. IF DBMS\_PIPE.receive\_message (l\_pipe\_getcount, 0) = 0
  152. THEN
  153. DBMS\_PIPE.PURGE (l\_pipe\_retcount);
  154. DBMS\_PIPE.pack\_message (l\_code\_count || '|running');
  155. l\_send\_result := DBMS\_PIPE.send\_message (l\_pipe\_retcount);
  156. END IF;
  157. BEGIN
  158. SELECT mc
  159. INTO l\_mc\_str
  160. FROM product
  161. WHERE ID = l\_substr;
  162. EXCEPTION
  163. WHEN OTHERS
  164. THEN
  165. GOTO outer\_loop;
  166. END;
  167. l\_sql :=
  168. 'select code1 from (select code1 /*+ first_rows */ from '
  169. || l\_tablename
  170. || ' where instr('''
  171. || p\_p1
  172. || ''',
  173. substr(code1,1,1)) > 0 and instr('''
  174. || p\_p2
  175. || ''',
  176. substr(code1,2,1)) > 0 and instr('''
  177. || p\_p3
  178. || ''',
  179. substr(code1,3,1)) > 0 and instr('''
  180. || p\_p4
  181. || ''',
  182. substr(code1,4,1)) > 0 and instr('''
  183. || p\_p5
  184. || ''',
  185. substr(code1,5,1)) > 0 and instr('''
  186. || p\_p6
  187. || ''',
  188. substr(code1,6,1)) > 0 and instr('''
  189. || p\_p7
  190. || ''',
  191. substr(code1,1,1)) > 0 ) where rownum=1 ';
  192. BEGIN
  193. EXECUTE IMMEDIATE l\_sql
  194. INTO l\_code\_str;
  195. EXCEPTION
  196. WHEN NO\_DATA\_FOUND
  197. THEN
  198. EXIT;
  199. END;
  200. IF SQL%ROWCOUNT = 1
  201. THEN
  202. l\_code\_count := l\_code\_count + 1;
  203. END IF;
  204. INSERT INTO product\_code
  205. VALUES (l\_code\_str, l\_wzmc\_str, p\_country\_code);
  206. EXECUTE IMMEDIATE 'delete from ' || l\_tablename || ' where code1=:x'
  207. USING l\_code\_str;
  208. UPDATE product
  209. SET status = 1
  210. WHERE ID = l\_substr;
  211. END LOOP;
  212. COMMIT;
  213. DBMS\_PIPE.PURGE (l\_pipe\_retcount);
  214. DBMS\_PIPE.pack\_message (TO\_CHAR (l\_code\_count) || '|end');
  215. l\_send\_result := DBMS\_PIPE.send\_message (l\_pipe\_retcount);
  216. r\_outstr :=
  217. LENGTH (l\_instr)
  218. - LENGTH (REPLACE (l\_instr, ',', ''))
  219. || '|'
  220. || l\_code\_count;
  221. DBMS\_OUTPUT.put\_line (r\_outstr);
  222. END IF;
  223. END p_gen_code;
  224. -- 3. 终止过程
  225. CREATE OR REPLACE PROCEDURE p_stop
  226. IS
  227. l_pipename VARCHAR2 (12) := 'mypipe';
  228. l_create_result INTEGER := DBMS_PIPE.create_pipe (l_pipename);
  229. l_send_result INTEGER;
  230. BEGIN
  231. DBMS_PIPE.PURGE (l_pipename);
  232. DBMS_PIPE.pack_message ('stop');
  233. l_send_result := DBMS_PIPE.send_message (l_pipename);
  234. DBMS_OUTPUT.put_line ('l_send_result: ' || l_send_result);
  235. END p_stop;
  236. -- 4. 取得当前已经生成代码的个数
  237. CREATE OR REPLACE FUNCTION fn_getcount (p_timeout NUMBER)
  238. RETURN VARCHAR2
  239. IS
  240. l_pipebuf VARCHAR2 (20);
  241. l_pipe_getcount VARCHAR2 (12) := 'getcount';
  242. l_pipe_retcount VARCHAR2 (12) := 'retcount';
  243. l_status NUMBER;
  244. l_receive NUMBER;
  245. BEGIN
  246. DBMS_PIPE.PURGE (l_pipe_getcount);
  247. DBMS_PIPE.pack_message (l_pipe_getcount);
  248. l_status := DBMS_PIPE.send_message (l_pipe_getcount);
  249. l_receive :=
  250. DBMS\_PIPE.receive\_message (l\_pipe\_retcount, NVL (p\_timeout, 0));
  251. IF l_receive = 0
  252. THEN
  253. DBMS\_PIPE.unpack\_message (l\_pipebuf);
  254. RETURN TO\_CHAR (l\_pipebuf);
  255. ELSE
  256. RETURN TO\_CHAR ('Timed out!');
  257. END IF;
  258. END fn_getcount;
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2016年12月28日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档