首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >将文件导入SQLite数据库,而不是使用所有核心

将文件导入SQLite数据库,而不是使用所有核心
EN

Stack Overflow用户
提问于 2022-02-17 18:32:29
回答 1查看 93关注 0票数 -1

上下文

我在SQLite的帮助下将IMDB数据库文件导入到一个EntityFrameworkCore数据库中。实际上,有两个文件,titles.basics和titles.akas (通过其电影ID链接到基本文件)。

一开始,我有一个线程从基础读取行,并循环通过akas,直到它改变ID,尽管存在一个问题,最重要的是,它太慢了。因此,我决定创建一个多线程代码,它同时读取两个文件,另一个将akas与适当的电影结合起来。

我目前正在导入,所以我仍然不知道我的问题是否是固定的(可能是)。不过,对我来说还是太慢了。

问题

结合部分仍然非常慢,但更重要的是,我可以看到我的进程只使用了大约12%的CPU,这只相当于总使用量的1/8,而且我有8个物理内核。所以,看起来这个过程只使用了一个核心。

我在这里没有给出任何代码,因为拥有一个最小的可测试代码并不意味着什么。不过,您可以在这里看到这两个版本:

https://cints.net/public/Imdb-MultiThread.cs.txt

代码语言:javascript
运行
复制
using com.cyberinternauts.all.MediaRecognizer.Database;
using com.cyberinternauts.all.MediaRecognizer.Models.Metas;
using Microsoft.EntityFrameworkCore;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;

namespace com.cyberinternauts.all.MediaRecognizer.MetaSources
{
    class Imdb : MediaSource
    {
        private const string TITLES_FILE = "title.basics.tsv.gz";
        private const string AKAS_FILE = "title.akas.tsv.gz";
        private readonly string temporaryFolder = @"c:\temp\";
        private readonly string baseUrl = "https://datasets.imdbws.com/";
        private readonly WebClient webClient = new();

        MediaRecognizerContext db = new();

        private IQueryable<MetaMovie> imdbMovies = null;

        private async Task<bool> GatherFilesAsync()
        {
            var totalFilesGathered = 0;
            var filesToDownload = new string[] { AKAS_FILE, TITLES_FILE };
            foreach(var fileToDownload in filesToDownload)
            {
                var compressedFile = temporaryFolder + fileToDownload;
                if (!File.Exists(compressedFile) || !File.GetLastWriteTime(compressedFile).Date.Equals(DateTime.Today))
                {
                    await GatherFileAsync(fileToDownload);
                    totalFilesGathered++;
                }
            }

            return totalFilesGathered != 0;
        }

        private async Task GatherFileAsync(string fileName)
        {
            var compressedFile = temporaryFolder + fileName;
            var uncompressedFile = temporaryFolder + Path.GetFileNameWithoutExtension(compressedFile);
            await webClient.DownloadFileTaskAsync(baseUrl + fileName, compressedFile);

            using Stream fd = File.Create(uncompressedFile);
            using Stream fs = File.OpenRead(compressedFile);
            using Stream csStream = new GZipStream(fs, CompressionMode.Decompress);
            var buffer = new byte[1024];
            int nRead;
            while ((nRead = await csStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
            {
                await fd.WriteAsync(buffer, 0, nRead);
            }
        }

        private async Task LoadMetaDataAsync()
        {
            //return; //TODO: Remove this line

            //TODO: Reactivate this line
            //if (!await GatherFilesAsync()) return;


            var titlesFile = temporaryFolder + Path.GetFileNameWithoutExtension(TITLES_FILE);
            var akasFile = temporaryFolder + Path.GetFileNameWithoutExtension(AKAS_FILE);
            var dbLock = new SemaphoreSlim(1);
            var akasLock = new SemaphoreSlim(1);
            var currentTitlesAkasLock = new SemaphoreSlim(1);
            var associateLock = new SemaphoreSlim(1);

            using (var db = new MediaRecognizerContext())
            {
                db.ChangeTracker.AutoDetectChangesEnabled = false;

                var titles = new ConcurrentDictionary<string, MetaMovie>();
                var readTitles = Task.Factory.StartNew(() =>
                {
                    Parallel.ForEach(File.ReadLines(titlesFile), (titleLine, _, readingIndex) =>
                    {
                        if (readingIndex == 0) return; // Skipping columns titles line

                        var movieInfos = titleLine.Split("\t", StringSplitOptions.None);
                        dbLock.Wait();
                        MetaMovie metaMovie = db.MetaMovies.Where(m => m.ExternalId == movieInfos[0]).Include(m => m.Titles).FirstOrDefault();
                        dbLock.Release();
                        if (metaMovie == null)
                        {
                            int totalMinutes = -1;
                            if (!int.TryParse(movieInfos[7], out totalMinutes))
                            {
                                totalMinutes = -1;
                            }
                            metaMovie = new MetaMovie
                            {
                                ExternalId = movieInfos[0],
                                MetaSource = nameof(Imdb),
                                MovieType = movieInfos[1],
                                Title = movieInfos[3],
                                TotalMinutes = totalMinutes,
                                Genres = movieInfos[8]
                            };
                            metaMovie.Titles = new List<MetaTitle>();
                            if (int.TryParse(movieInfos[5], out int startYear))
                            {
                                metaMovie.StartYear = new DateTime(startYear, 1, 1);
                            }
                            else
                            {
                                metaMovie.StartYear = new DateTime(9999, 1, 1);
                            }
                            if (int.TryParse(movieInfos[6], out int endYear))
                            {
                                metaMovie.EndYear = new DateTime(endYear, 1, 1);
                            }
                            else
                            {
                                metaMovie.EndYear = metaMovie.StartYear;
                            }
                        }

                        titles.TryAdd(metaMovie.ExternalId, metaMovie);
                    });
                });

                var akas = new Dictionary<string, List<MetaTitle>>();
                var currentTitlesAkas = new ConcurrentDictionary<string, int>();
                var readAkas = Task.Factory.StartNew(() =>
                {
                    Parallel.ForEach(File.ReadLines(akasFile), (akaLine, _, readingIndex) =>
                    {
                        if (readingIndex == 0) return; // Skipping columns titles line

                        currentTitlesAkasLock.Wait();
                        var titleInfos = akaLine.Split("\t", StringSplitOptions.None);
                        var externalId = titleInfos[0];
                        if (!currentTitlesAkas.ContainsKey(externalId))
                        {
                            currentTitlesAkas.TryAdd(externalId, 1);
                        }
                        else
                        {
                            currentTitlesAkas[externalId]++;
                        }
                        currentTitlesAkasLock.Release();

                        var metaTitle = new MetaTitle
                        {
                            MetaMovie = null,
                            Text = titleInfos[2],
                            Region = titleInfos[3],
                            Language = titleInfos[4]
                        };

                        akasLock.Wait();
                        List<MetaTitle> titleAkas;
                        if (!akas.ContainsKey(externalId))
                        {
                            titleAkas = new List<MetaTitle>();
                            akas.Add(externalId, titleAkas);
                        }
                        else
                        {
                            titleAkas = akas[externalId];
                        }
                        titleAkas.Add(metaTitle);
                        akasLock.Release();

                        currentTitlesAkasLock.Wait();
                        currentTitlesAkas[externalId]--;
                        currentTitlesAkasLock.Release();
                    });
                });

                var savingCounter = 0;
                var associate = Task.Factory.StartNew(() =>
                {
                    Parallel.For(1, Environment.ProcessorCount * 10, async (_) =>
                    {
                        var isAssociating = true;
                        do
                        {
                            var externalId = string.Empty;
                            var currentTitleAkaRemoved = false;
                            currentTitlesAkasLock.Wait();
                            foreach (var curExternalId in currentTitlesAkas.Keys.OrderBy(t => t))
                            {
                                if (currentTitlesAkas[curExternalId] == 0)
                                {
                                    externalId = curExternalId;
                                    break;
                                }
                            }
                            if (externalId != String.Empty)
                            {
                                currentTitleAkaRemoved = currentTitlesAkas.TryRemove(externalId, out int useless0); // Removing so other threads won't take it
                            }
                            isAssociating = !readAkas.IsCompleted || !readTitles.IsCompleted || !currentTitlesAkas.IsEmpty;
                            currentTitlesAkasLock.Release();

                            if (String.IsNullOrEmpty(externalId) || !currentTitleAkaRemoved) continue;

                            if (titles.TryGetValue(externalId, out MetaMovie metaMovie))
                            {
                                akasLock.Wait();
                                var titleAkas = akas[externalId];
                                akas.Remove(externalId);
                                akasLock.Release();

                                var changedMovie = false;
                                var movieAkas = metaMovie.Titles.Select(t => t).ToList(); // Clone list
                                foreach (var metaTitle in titleAkas)
                                {
                                    var existingTitle = movieAkas.Where(t => t.Text == metaTitle.Text && t.Region == metaTitle.Region && t.Language == metaTitle.Language).FirstOrDefault();
                                    if (existingTitle == null)
                                    {
                                        changedMovie = true;
                                        metaMovie.Titles.Add(metaTitle);
                                    }
                                    else
                                    {
                                        movieAkas.Remove(existingTitle);
                                    }
                                }
                                foreach (var movieTitle in movieAkas)
                                {
                                    changedMovie = true;
                                    metaMovie.Titles.Remove(movieTitle);
                                }

                                dbLock.Wait();
                                if (metaMovie.Id == 0)
                                {
                                    db.Add(metaMovie);
                                }
                                else if (changedMovie)
                                {
                                    db.Update(metaMovie);
                                }
                                dbLock.Release();

                                currentTitlesAkasLock.Wait();
                                currentTitlesAkas.TryRemove(externalId, out int uselessOut); // Free memory
                                isAssociating = !readAkas.IsCompleted || !readTitles.IsCompleted || !currentTitlesAkas.IsEmpty;
                                currentTitlesAkasLock.Release();

                                titles.TryRemove(externalId, out MetaMovie uselessOut2); // Free memory

                                associateLock.Wait();
                                savingCounter++;
                                var localSavingCounter = savingCounter;
                                associateLock.Release();

                                if (localSavingCounter != 0 && localSavingCounter % 1000 == 0)
                                {
                                    var ttt = currentTitlesAkas.Where(t => t.Value > 0);
                                    dbLock.Wait();
                                    await db.SaveChangesAsync();
                                    dbLock.Release();
                                    Console.WriteLine("Saved " + localSavingCounter);
                                }
                            }
                            else if (!readTitles.IsCompleted) // If reading titles is not ended, then maybe it was not read yet... otherwise, it doesn't exist
                            {
                                currentTitlesAkasLock.Wait();
                                currentTitlesAkas.TryAdd(externalId, 0); // Readd because still no movie associated
                                currentTitlesAkasLock.Release();
                            }
                        } while (isAssociating);
                    });
                });

                Task.WaitAll(readTitles, readAkas, associate);
                await db.SaveChangesAsync();
            }
        }

        public async override Task<IEnumerable<MetaMovie>> FindMediasAsync(DirectoryInfo directory)
        {
            await LoadMetaDataAsync();

            var movie = await ExtractInfosAsync(directory);
            if (movie == null) return null;

            if (imdbMovies == null)
            {
                imdbMovies = db.MetaMovies.Where(m => m.MetaSource == nameof(Imdb) && m.MovieType == "movie");
            }

            return FindCorrespondances(imdbMovies, movie);
        }
    }
}

https://cints.net/public/Imdb-SingleThread.cs.txt

代码语言:javascript
运行
复制
using com.cyberinternauts.all.MediaRecognizer.Database;
using com.cyberinternauts.all.MediaRecognizer.Models.Metas;
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Threading.Tasks;

namespace com.cyberinternauts.all.MediaRecognizer.MetaSources
{
    class Imdb : MediaSource
    {
        private const string TITLES_FILE = "title.basics.tsv.gz";
        private const string AKAS_FILE = "title.akas.tsv.gz";
        private readonly string temporaryFolder = @"c:\temp\";
        private readonly string baseUrl = "https://datasets.imdbws.com/";
        private readonly WebClient webClient = new();

        MediaRecognizerContext db = new();

        private IQueryable<MetaMovie> imdbMovies = null;

        private async Task<bool> GatherFilesAsync()
        {
            var totalFilesGathered = 0;
            var filesToDownload = new string[] { AKAS_FILE, TITLES_FILE };
            foreach(var fileToDownload in filesToDownload)
            {
                var compressedFile = temporaryFolder + fileToDownload;
                if (!File.Exists(compressedFile) || !File.GetLastWriteTime(compressedFile).Date.Equals(DateTime.Today))
                {
                    await GatherFileAsync(fileToDownload);
                    totalFilesGathered++;
                }
            }

            return totalFilesGathered != 0;
        }

        private async Task GatherFileAsync(string fileName)
        {
            var compressedFile = temporaryFolder + fileName;
            var uncompressedFile = temporaryFolder + Path.GetFileNameWithoutExtension(compressedFile);
            await webClient.DownloadFileTaskAsync(baseUrl + fileName, compressedFile);

            using Stream fd = File.Create(uncompressedFile);
            using Stream fs = File.OpenRead(compressedFile);
            using Stream csStream = new GZipStream(fs, CompressionMode.Decompress);
            var buffer = new byte[1024];
            int nRead;
            while ((nRead = await csStream.ReadAsync(buffer, 0, buffer.Length)) > 0)
            {
                await fd.WriteAsync(buffer, 0, nRead);
            }
        }

        private async Task LoadMetaDataAsync()
        {
            //return; //TODO: Remove this line

            //TODO: Reactivate this line
            //if (!await GatherFilesAsync()) return;

            var titlesFile = temporaryFolder + Path.GetFileNameWithoutExtension(TITLES_FILE);
            var akasFile = temporaryFolder + Path.GetFileNameWithoutExtension(AKAS_FILE);
            var titlesLines = File.ReadLines(titlesFile);
            var akasLines = File.ReadLines(akasFile);


            var titlesIterator = titlesLines.GetEnumerator();
            titlesIterator.MoveNext(); // Skip columns headers

            var akasIterator = akasLines.GetEnumerator();
            akasIterator.MoveNext();
            akasIterator.MoveNext(); // Done twice to skip columns headers
            var currentAka = akasIterator.Current;
            var savingCounter = 0;

            using (var db = new MediaRecognizerContext())
            {
                db.ChangeTracker.AutoDetectChangesEnabled = false;
                while (titlesIterator.MoveNext())
                {
                    var titleLine = titlesIterator.Current;
                    var movieInfos = titleLine.Split("\t", StringSplitOptions.None);
                    MetaMovie metaMovie = db.MetaMovies.Where(m => m.ExternalId == movieInfos[0]).FirstOrDefault();
                    var isNewMovie = false;
                    if (metaMovie == null)
                    {
                        int totalMinutes = -1;
                        if (!int.TryParse(movieInfos[7], out totalMinutes))
                        {
                            totalMinutes = -1;
                        }
                        isNewMovie = true;
                        metaMovie = new MetaMovie
                        {
                            ExternalId = movieInfos[0],
                            MetaSource = nameof(Imdb),
                            MovieType = movieInfos[1],
                            Title = movieInfos[3],
                            TotalMinutes = totalMinutes,
                            Genres = movieInfos[8]
                        };
                        metaMovie.Titles = new List<MetaTitle>();
                        if (int.TryParse(movieInfos[5], out int startYear))
                        {
                            metaMovie.StartYear = new DateTime(startYear, 1, 1);
                        }
                        else
                        {
                            metaMovie.StartYear = new DateTime(9999, 1, 1);
                        }
                        if (int.TryParse(movieInfos[6], out int endYear))
                        {
                            metaMovie.EndYear = new DateTime(endYear, 1, 1);
                        }
                        else
                        {
                            metaMovie.EndYear = metaMovie.StartYear;
                        }
                    }

                    var movieAkasIds = metaMovie.Titles.Select(t => t.Id).ToList();
                    var titleInfos = currentAka?.Split("\t", StringSplitOptions.None);
                    while (currentAka != null && int.Parse(titleInfos[0][2..]) <= int.Parse(metaMovie.ExternalId[2..]))
                    {
                        if (titleInfos[0] == metaMovie.ExternalId)
                        {
                            var metaTitle = new MetaTitle
                            {
                                MetaMovie = metaMovie,
                                Text = titleInfos[2],
                                Region = titleInfos[3],
                                Language = titleInfos[4]
                            };

                            var existingTitle = metaMovie.Titles.Where(t => t.Text == metaTitle.Text && t.Region == metaTitle.Region && t.Language == metaTitle.Language).FirstOrDefault();
                            if (existingTitle == null)
                            {
                                metaMovie.Titles.Add(metaTitle);
                            }
                            else
                            {
                                movieAkasIds.Remove(existingTitle.Id);
                            }
                        }
                        else
                        {
                            var a = 1;
                        }

                        akasIterator.MoveNext();
                        currentAka = akasIterator.Current;
                        titleInfos = currentAka.Split("\t", StringSplitOptions.None);
                    }

                    foreach(var movieTitleId in movieAkasIds)
                    {
                        metaMovie.Titles.Remove(metaMovie.Titles.Where(t => t.Id == movieTitleId).FirstOrDefault());
                    }

                    if (isNewMovie)
                    {
                        db.Add(metaMovie);
                    }
                    else
                    {
                        db.Update(metaMovie);
                    }

                    savingCounter++;
                    if (savingCounter % 10000 == 0)
                    {
                        await db.SaveChangesAsync();
                        Console.WriteLine("Saved " + savingCounter);
                    }
                }

                await db.SaveChangesAsync();
            }
        }

        public async override Task<IEnumerable<MetaMovie>> FindMediasAsync(DirectoryInfo directory)
        {
            await LoadMetaDataAsync();

            var movie = await ExtractInfosAsync(directory);
            if (movie == null) return null;

            if (imdbMovies == null)
            {
                imdbMovies = db.MetaMovies.Where(m => m.MetaSource == nameof(Imdb) && m.MovieType == "movie");
            }

            return FindCorrespondances(imdbMovies, movie);
        }
    }
}

在多线程版本中,慢部分位于LoadMetaDataAsync方法中,更确切地说是在var associate = Task.Factory.StartNew(() =>代码部分。

这是在开发和清洗,分裂将完成后,我有适当的结果/速度。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-02-23 21:47:48

案子结了。我返回到了单线程版本,并找到了我的初始问题(我的代码假设文件是有序的,它们是部分的)。

感谢所有参加的人。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71163604

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档