专栏首页区块链实战【Java多线程】写入同一文件,自定义线程池与线程回收利用2 顶

【Java多线程】写入同一文件,自定义线程池与线程回收利用2 顶

起初为了方便快捷,只为实现功能,写了很多垃圾的代码. 造成性能不高,可读性,可维护性不强。

朋友们提了很多意见,我都吸取了经验,于是将代码又改动了一下。

经过测试,运行效率显著提升:

任务完成时间:30508 ms

任务完成时间:30735 ms

任务完成时间:31167 ms

package test.com.linapex.room;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.OutputStreamWriter;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

import com.linapex.common.util.ZhengzeValidate;

public class TBuilderRoomSqlFileTool2
{
	final static int BSIZE = 1024 * 1024;
	final static int DATACACHENUM = 10000;

	static int currThreadCount = 0;
	static int maxThreadCount = 9;

	static File roomFilterLogFile = new File("roomFilter.log");
	static File sqlFile = new File("roomSql.sql");
	static File csvFile = new File("D:\\baiduyundownload\\如家汉庭等酒店2000W开房数据\\2000W\\1-200W.csv");

	final static String sqlStrTemplate = "INSERT INTO `t_room_record`(id ,name, card, gender, birthday, address, zip, mobile, email, version) VALUES (null,':0', ':1', ':2', ':3', ':4', ':5', ':6', ':7',':8');";

	public static BufferedWriter initSQLWrite(File sqlFile) throws Exception
	{
		if (!sqlFile.exists())
		{
			if (!sqlFile.createNewFile())
			{
				System.err.println("创建文件失败,已存在:" + sqlFile.getAbsolutePath());
			}
		}

		return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(sqlFile, true), "UTF-8"));
	}

	public static void loadCSV(CallBack3<Void> callBack) throws Exception
	{
		//		BufferedReader reader = null;
		//		try
		//		{
		//			reader = new BufferedReader(new FileReader(csvFile));
		//			String str = null;
		//
		//			int num = 0;
		//			while ((str = reader.readLine()) != null)
		//			{
		//				num++;
		//				callBack.call(num, str);
		//			}
		//		} finally
		//		{
		//			reader.close();
		//		}

		FileChannel inChannel = null;
		try
		{
			String enterStr = "\n";

			inChannel = new FileInputStream(csvFile).getChannel();

			ByteBuffer buffer = ByteBuffer.allocate(BSIZE);

			StringBuilder newlinesBui = new StringBuilder();
			int num = 0;
			while (inChannel.read(buffer) != -1)
			{
				buffer.flip();

				//数据组合.
				String content = new String(buffer.array());
				newlinesBui.append(content).toString();

				int fromIndex = 0;
				int endIndex = -1;
				//循环找到 \n
				while ((endIndex = newlinesBui.indexOf(enterStr, fromIndex)) > -1)
				{
					//得到一行
					String line = newlinesBui.substring(fromIndex, endIndex);

					num++;
					callBack.call(num, line);

					fromIndex = endIndex + 1;
				}

				newlinesBui.delete(0, fromIndex);
				buffer.clear();
			}

		} finally
		{
			if (inChannel != null)
			{
				inChannel.close();
			}
		}

	}

	public static void main(String[] args) throws Exception
	{
		final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreadCount);

		final List<Future<String>> threadResultList = new ArrayList<Future<String>>();

		final BufferedWriter bw = initSQLWrite(sqlFile); //主要的buffer对象.

		final WriteSqlHandle2 writeSqlFile = new WriteSqlHandle2(DATACACHENUM);

		StopWatch2 stopWatch = new StopWatch2();
		stopWatch.start();

		loadCSV(new CallBack3<Void>()
		{

			@Override
			public Void call(int num, String str)
			{
				String[] strs = str.split(",");

				if (strs.length < 8)
				{
					writeLog("此条数据不录入::0", Arrays.toString(strs));
					return null;
				}

				String name = strs[0].trim();
				if (!ZhengzeValidate.isChina(name))
				{
					writeLog("此条数据不录入::0", Arrays.toString(strs));
					return null;
				}

				try
				{
					String card = strs[4];
					String gender = strs[5];
					String birthday = strs[6];
					String address = strs[7];
					String zip = strs[8];
					String mobile = strs[20];
					String email = strs[22];
					String version = strs[31];

					//生成sql语句
					final String tempSql = tm(sqlStrTemplate, name, card, gender, birthday, address, zip, mobile, email, version);

					//添加数据,如果超出了缓存数据,则 开始写入文件系统
					if (writeSqlFile.add(tempSql))
					{
						currThreadCount++;

						//如果提交的线程过多,则取回之后再提交.
						if (currThreadCount >= maxThreadCount)
						{
							//							System.out.println(String.format("当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount));
							for (Future<String> fs : threadResultList)
							{
								String tempSqlName = fs.get();

								currThreadCount--;
								//								System.out.println("已回调线程数:" + (maxThreadCount - currThreadCount) + "  线程返回的值:" + tempSqlName);
							}

							threadResultList.clear(); //清空
							currThreadCount = threadResultList.size();
							//							System.out.println(String.format("重新开始提交线程   当前线程数:%s 允许最大线程数:%s 等待线程完成回调.", currThreadCount, maxThreadCount));
						}

						Future<String> future = threadPool.submit(new TaskWithResult(writeSqlFile, bw));

						threadResultList.add(future);
						//						System.out.println(String.format("开启了%s条线程(保存了%s条数据)", curr_thread_count, num));
					}

				} catch (Exception e)
				{
					writeLog("录入错误的数据::0", Arrays.toString(strs));
					writeLog("错误的原因::0", e.getMessage());
				}
				return null;
			}
		});

		writeSqlFile.flush(bw);

		threadPool.shutdown();

		stopWatch.stop();

		System.out.println(String.format("任务完成时间:%s ms", stopWatch.getTime()));

	}

	public static void writeLog(String str, Object... values)
	{
		//FileUtils.doWriteFile(roomFilterLogFile.getAbsolutePath(), tm(str, values) + "\r\n", null, false);
	}

	public static String tm(String strSource, Object... values)
	{
		if (strSource == null)
		{
			return null;
		}

		StringBuilder builder = new StringBuilder(strSource);

		final String prefix = ":";
		for (int index = 0; index < values.length; index++)
		{
			String value = values[index].toString();
			if (value == null)
			{
				continue;
			}

			String key = new StringBuilder(prefix).append(index).toString();

			int i = -1;
			if ((i = builder.indexOf(key, i)) > -1)
			{
				int len = key.length();
				builder.replace(i, i + len, value);
			}
		}

		return builder.toString();
	}

}

class TaskWithResult implements Callable<String>
{
	WriteSqlHandle2 handle2;

	BufferedWriter bufferedWriter;

	public TaskWithResult(WriteSqlHandle2 handle2, BufferedWriter bufferedWriter)
	{
		this.handle2 = handle2;
		this.bufferedWriter = bufferedWriter;
	}

	@Override
	public String call() throws Exception
	{
		String fileName = Thread.currentThread().getName();

		handle2.save(bufferedWriter);

		return fileName;
	}

}

class WriteSqlHandle2
{
	ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

	WriteLock writeLock = readWriteLock.writeLock();

	List<String> cacheList;

	int currItemCount = 0;

	int dataCacheNum;

	public WriteSqlHandle2()
	{
		cacheList = new ArrayList<String>();
	}

	public WriteSqlHandle2(int dataCacheNum)
	{
		this.dataCacheNum = dataCacheNum;
		cacheList = new ArrayList<String>(dataCacheNum);
	}

	public boolean isCacheExpires()
	{
		return currItemCount >= dataCacheNum;
	}

	public boolean add(String sqlStr)
	{
		try
		{
			writeLock.lock();
			cacheList.add(sqlStr);
			currItemCount++;
			return isCacheExpires();
		} finally
		{
			writeLock.unlock();
		}
	}

	public void save(BufferedWriter bw) throws Exception
	{
		try
		{
			writeLock.lock();

			//如果数据没有超出缓存.则返回.
			if (!isCacheExpires())
			{
				return;
			}

			StopWatch2 stopWatch = new StopWatch2();
			stopWatch.start();

			//			System.out.println(String.format("%s,准备消费   需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size()));

			for (String str : cacheList)
			{
				bw.write(str + "\r\n");
				currItemCount--;
			}

			stopWatch.stop();

			System.out.println(String.format("%s,消费完成,耗费时间:%s ms,消费数据长度:%s", Thread.currentThread().getName(), stopWatch.getTime(), cacheList.size()));

			cacheList.clear(); //清空数据.
		} finally
		{
			writeLock.unlock();
		}
	}

	public void flush(BufferedWriter bw) throws Exception
	{
		System.out.println(String.format("flush线程:%s, 需要保存数据的集合长度:%s", Thread.currentThread().getName(), cacheList.size()));

		for (String str : cacheList)
		{
			bw.write(str + "\r\n");
		}

		System.out.println(String.format("flush线程:%s, 消费完成,消费数据长度:%s", Thread.currentThread().getName(), cacheList.size()));

		cacheList.clear(); //清空数据

		closeWrite(bw);
	}

	private void closeWrite(BufferedWriter bw) throws Exception
	{
		bw.flush();
		bw.close();
	}

}

class StopWatch2
{
	long begin;
	long end;

	public void start()
	{
		begin = System.currentTimeMillis();
	}

	public void stop()
	{
		end = System.currentTimeMillis();
	}

	public long getTime()
	{
		return end - begin;
	}
}

interface CallBack3<T>
{
	T call(int num, String str);
}

(adsbygoogle = window.adsbygoogle || []).push({});

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 【Java多线程】写入同一文件,自定义线程池与线程回收利用 顶

    一个文件好几百兆,1个文件大概200万行左右的数据,现在我要解决的问题是,将 csv的数据读出来,组合数据,生成sql文件。

    linapex
  • 刚刚更新:在线聊天系统设计(原理+思路+源码+效果图) 顶

    虚拟,普通用户有一个好友列表,好友列表保存着用户的好友,对于虚拟,普通用户来说,他们的好友列表只有高级用户+管理员用户。

    linapex
  • 【Java】BufferedReader与NIO读取文件性能测试

    我对 BufferedReader  与 NIO  读取文件效果做了一个简单的测试

    linapex
  • 数据源管理 | 关系型分库分表,列式库分布式计算

    随着业务发展,数据量的越来越大,业务系统越来越复杂,拆分的概念逻辑就应运而生。数据层面的拆分,主要解决部分表数据过大,导致处理时间过长,长期占用链接,甚至出现大...

    知了一笑
  • 从爬取的文章 HTML 中提取出中文关键字

    https://github.com/KotlinSpringBoot/saber

    一个会写诗的程序员
  • 剖析更高级的Redis客户端Lettuce

    Lettuce是一个Redis的Java驱动包,初识她的时候是使用RedisTemplate的时候遇到点问题Debug到底层的一些源码,发现spring-dat...

    黄泽杰
  • Redis高级客户端Lettuce详解

    Lettuce是一个Redis的Java驱动包,初识她的时候是使用RedisTemplate的时候遇到点问题Debug到底层的一些源码,发现spring-dat...

    猿天地
  • 【Java多线程】写入同一文件,自定义线程池与线程回收利用 顶

    一个文件好几百兆,1个文件大概200万行左右的数据,现在我要解决的问题是,将 csv的数据读出来,组合数据,生成sql文件。

    linapex
  • Android 中Volley二次封装并实现网络请求缓存

    Android目前很多同学使用Volley请求网络数据,但是Volley没有对请求过得数据进行缓存,因此需要我们自己手动缓存。 一下就是我的一种思路,仅供参考

    砸漏
  • 小瓜牛漫谈 — String

    String 类在 Java 中代表字符串。Java 程序中的所有字符串字面值(如 "abc" )都作为此类的实例实现。 1 public static voi...

    猿人谷

扫码关注云+社区

领取腾讯云代金券