开发一个c#应用程序,该应用程序将根据用户上传的文件运行一系列任务;这可能需要几秒到几天的时间。如果过程中断,我计划实施某种记录系统以恢复工作。什么是这个记录中断/简历调用,我可以在哪里了解更多关于现有的实现?类似于文件系统日志记录或容错日志记录来管理任务的过程?
编辑:
有用的事情,到目前为止,我发现:
一个日志文件系统将使用相同的基本过程,具有 一些额外的步骤。喜欢的东西:
- 日记帐分录:从移动文件到B
- 物理旧文件复制到新的位置从旧驱动器
- 删除目录项新的驱动器上
- 更新目录项
- 旧驱动器上的可用空间
- 日记条目:完成将文件从A移动到B
从https://serverfault.com/questions/173176/what-is-a-journaling-file-system
点校验:书写的方法,轴颈元数据和数据到其固定的位置被称为检查点。当跨越各种阈值 ,例如时,触发点校验。,文件系统缓冲区空间不足时,日志中剩余空间不足或计时器到期时。
崩溃恢复:崩溃恢复是在EXT3简单(因为它是在许多日志文件系统);使用基本形式重做日志。因为新的更新(无论是 数据还是只是元数据)都写入日志,所以恢复就地文件系统结构的过程非常简单。在恢复过程中,文件系统会扫描日志以确认已完成的事务;不完整的交易被丢弃。完成的事务中的每个更新都会重放到固定位置的ext2结构中。
从http://research.cs.wisc.edu/adsl/Publications/sba-usenix05.pdf(第5页)
编辑2:
我不觉得我知道更多关于容错比当我第一次张贴了这个问题,但这里是一个我实施的大纲。
主要首先尝试让作业管理器从文件加载任何现有的已保存状态,或创建一个新的空管理器。
public static void Main(string[] args)
{
try
{
_jxman = JobManager<Job>.Load(Properties.Settings.Default.JobJournalFilename);
}
catch
{
_jxman = new JobManager<Job>(Properties.Settings.Default.JobJournalFilename);
}
...
_jxman.Start();
...
}
而且JobManager类看起来像这样
public sealed class JobManager<T> : WorkGroupBase<JobBase>, IWorkSerializable where T : IJob
{
#region Fields
/// <summary>
/// Hash goes here in file
/// </summary>
private const string _hashHeading = "SHA-256";
/// <summary>
/// Flag to know whether to update the journal
/// </summary>
private bool _isDirty = false;
/// <summary>
/// Last time the journal was written to disk
/// </summary>
private DateTime _lastSaveTime = DateTime.MinValue;
/// <summary>
/// Minimum time to wait before writing journal to disk again
/// </summary>
private TimeSpan _minTimeToSave = new TimeSpan(0,0,60);
/// <summary>
/// Threading object for lock
/// </summary>
private object _lock = new object();
/// <summary>
/// Thread to monitor status
/// </summary>
private Thread _watchDirtyFlag;
#endregion
#region Properties
/// <summary>
/// journal file to track changes
/// </summary>
public string Filename
{
get;
private set;
}
#endregion
#region Constructors
/// <summary>
/// default constructor
/// </summary>
/// <param name="filename">Path to filename to write journal file</param>
public JobManager(string filename) : base()
{
ConstructorHelper();
Filename = filename;
}
/// <summary>
/// Parses XML element to recreate the item
/// </summary>
/// <param name="xe">XML element used to create object</param>
public JobManager(XElement xe)
: base(xe)
{
// Checksum validation before doing anything else.
// Will throw exception on failure.
ValidateChecksum(xe);
ConstructorHelper();
string myName = "JobManager";
XElement myself;
try
{
myself = xe.DescendantsAndSelf(myName).First();
}
catch
{
throw new ArgumentException("Attempting to instantiate object, but no relevant information was found in the XML element");
}
Filename = myself.FirstElementValue("Filename");
// Load up all the jobs
XElement[] wq = myself.Descendants("WorkQueue").Elements().ToArray();
foreach (XElement x in wq)
{
try
{
IJob blarg = (IJob)Activator.CreateInstance(typeof(T), x);
if (blarg != null)
WorkQueue.Enqueue((JobBase)blarg);
}
catch
{ }
}
}
/// <summary>
/// Helper for common constructing
/// </summary>
private void ConstructorHelper()
{
// need to wait for the base constructor to finish before attempting to
// hook events there
base.QueueChanged += new EventHandler(JobManager_QueueChanged);
base.HookQueueChangedEvents();
_watchDirtyFlag = new Thread(WatchDirtyFlag);
_watchDirtyFlag.Start();
}
#endregion
#region Methods
/// <summary>
/// Saves the state of the JobManager to Filename using XML
/// </summary>
public void Save()
{
TextWriter writer = null;
try
{
writer = new StreamWriter(Filename);
writer.Write(this.ToXElement());
}
catch (Exception ex)
{
throw ex;
}
finally
{
writer.Close();
}
}
/// <summary>
/// Loads the filename and attempts to parse it as XML to
/// create a JobManager. Pass the type of job to manage.
/// </summary>
/// <param name="filename">File storing the JobManager as XML</param>
/// <returns>JobManager with values loaded from file</returns>
public static JobManager<T> Load(string filename)
{
if (filename == "")
throw new ArgumentException("Can not load JobManager: Filename not set");
TextReader reader = null;
string text;
try
{
reader = new StreamReader(filename);
text = reader.ReadToEnd();
}
catch (Exception ex)
{
throw ex;
}
finally
{
reader.Close();
}
XElement loadFrom = null;
try
{
loadFrom = XElement.Parse(text);
}
catch //(Exception ex)
{
//throw ex;
loadFrom = new XElement("empty");
}
JobManager<T> output = new JobManager<T>(loadFrom);
output.Filename = filename;
return output;
}
/// <summary>
/// Converts the item to an XML element
/// </summary>
/// <returns></returns>
new public XElement ToXElement()
{
XElement bxe = base.ToXElement();
//string myName = this.GetType().Name;
string myName = "JobManager";
XElement wq = new XElement("WorkQueue");
foreach (IWorkSerializable t in WorkQueue.ToArray())
{
XElement addee = t.ToXElement();
wq.Add(addee);
}
bxe.Add(wq);
XElement xe = new XElement(myName,
bxe,
new XElement("Filename", Filename)
);
xe.Add(
new XElement(_hashHeading, Generic.ComputeSha256Hash(xe.ToString()))
);
return xe;
}
/// <summary>
/// Validates the checksum for the current xelement. Throws exceptions on failure
/// </summary>
/// <param name="xe">XML tree of the itme to validate</param>
private void ValidateChecksum(XElement xe)
{
XElement checksum;
try
{
checksum = xe.DescendantsAndSelf(_hashHeading).First();
}
catch (Exception ex)
{
throw new Exception("Unable to find checksum node", ex);
}
XElement withoutChecksum = new XElement(xe);
withoutChecksum.Elements(_hashHeading).Remove();
string computedChecksum = Generic.ComputeSha256Hash(withoutChecksum.ToString());
if (computedChecksum != checksum.Value)
throw new Exception("Checksum from XML element and checksum from contents of XML element do not match: \n" + xe.Value);
}
/// <summary>
/// This thread will watch the dirty flag, which is set everytime the
/// queues are changed. Every _minTimeToSave the flag is checked, and
/// if the flag is set, Save() is called.
/// </summary>
private void WatchDirtyFlag()
{
while (true)
{
// sleep until there's something to update
while (_isDirty == false)
{
Thread.Sleep(_minTimeToSave);
}
// but don't update too frequently
if (DateTime.Now.Subtract(_lastSaveTime) > _minTimeToSave)
{
// save first ...
this.Save();
// then update items ...
_lastSaveTime = DateTime.Now;
lock (_lock)
{
_isDirty = false;
}
}
}
}
#endregion
#region Event Handlers
/// <summary>
/// updates flag when any underlying queue changes
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
void JobManager_QueueChanged(object sender, EventArgs e)
{
lock (_lock)
{
_isDirty = true;
}
}
#endregion
}
注意事项:
- 这是不完整的代码,如果有人试图复制这个(缺少基础类和事物)
- 定期(二进制)和XML序列化从未正常工作,所以我实现了将对象保存为XML的自定义序列化。这是
ToXElement()
方法,以及需要参数XElement
的构造函数。 - 在序列化的顶层(JobManager)中包含校验和(SHA-256)。当从
XElement
实例化新对象时,将保存的序列化对象的校验和与文件中的校验和进行比较。 - 有一种静态方法
.Load(file)
它通过读取文件并尝试反序列化内容来返回新的JobManager对象。 - 此处未显示自定义ConcurrentQueue类。这是MSDN ConcurrentQueue类的封装程序,但添加了一个事件来通知队列更改的时间。
- 这个JobManager类实现了一个前面提到的ConcurrentQueue的基类;这些队列变化事件在构造辅助
- 迷上当事件触发时,JobManager设置一个标志
_isDirty
- 的JobManager开始于实例化线程,监控
_isDirty
标志。大部分时间都用于睡眠,但如果至少已经过了_minTimeToSave
,则JobManager的内容将被序列化为磁盘。这应该使JobManager不能太频繁地写入磁盘。
。著名的顾虑没有解决:
- 在线程真正正确的解决方案看
_isDirty
标志? - JobManager(单个线程)管理包含任务的作业(一次一个,但是不同的线程);没有类到基类的同步锁定状态,而序列化
- 老已完成作业被序列化到磁盘,然后重装