vtkStreamingDemandDrivenPipeline是只支持更新管道中部分数据集的执行程序。这是老式VTK 4.x管道提供的管道更新样式。该执行器不总是更新整个数据集,而是支持请求数据块或子数据块。
1.类图结构
vtkStreamingDemandDrivenPipeline类的继承关系图:
vtkStreamingDemandDrivenPipeline类的协作图:
vtkStreamingDemandDrivenPipeline Class Reference
2.实现代码
2.1Update()
//------------------------------------------------------------------------------
vtkTypeBool vtkStreamingDemandDrivenPipeline::Update(int port)
{
return this->Update(port, nullptr);
}
//------------------------------------------------------------------------------
vtkTypeBool vtkStreamingDemandDrivenPipeline::Update(int port, vtkInformationVector* requests)
{
if (!this->UpdateInformation())
{
return 0;
}
int numPorts = this->Algorithm->GetNumberOfOutputPorts();
if (requests)
{
vtkInformationVector* outInfoVec = this->GetOutputInformation();
for (int i = 0; i < numPorts; i++)
{
vtkInformation* outInfo = outInfoVec->GetInformationObject(i);
vtkInformation* req = requests->GetInformationObject(i);
if (outInfo && req)
{
outInfo->Append(req);
}
}
}
if (port >= -1 && port < numPorts)
{
int retval = 1;
// some streaming filters can request that the pipeline execute multiple
// times for a single update
//一些流式过滤器可以请求管道为一次更新执行多次
do
{
this->PropagateTime(port);
this->UpdateTimeDependentInformation(port);
retval = retval && this->PropagateUpdateExtent(port);
if (retval && !this->LastPropogateUpdateExtentShortCircuited)
{
retval = retval && this->UpdateData(port);
}
} while (this->ContinueExecuting);
return retval;
}
else
{
return 1;
}
}
因为requests参数为nullputr,所以此处实际上执行代码为:
do
{
this->PropagateTime(port);
this->UpdateTimeDependentInformation(port);
retval = retval && this->PropagateUpdateExtent(port);
if (retval && !this->LastPropogateUpdateExtentShortCircuited)
{
retval = retval && this->UpdateData(port);
}
} while (this->ContinueExecuting);
2.1.1 this->PropagateTime(port)
int vtkStreamingDemandDrivenPipeline::PropagateTime(int outputPort)
{
// The algorithm should not invoke anything on the executive.
//算法不应该调用任何关于执行程序的内容。
if (!this->CheckAlgorithm("PropagateTime", nullptr))
{
return 0;
}
// Range check.
if (outputPort < -1 || outputPort >= this->Algorithm->GetNumberOfOutputPorts())
{
vtkErrorMacro("PropagateUpdateTime given output port index "
<< outputPort << " on an algorithm with " << this->Algorithm->GetNumberOfOutputPorts()
<< " output ports.");
return 0;
}
// Setup the request for update extent propagation.
if (!this->UpdateTimeRequest)
{
this->UpdateTimeRequest = vtkInformation::New();
this->UpdateTimeRequest->Set(REQUEST_UPDATE_TIME());
// The request is forwarded upstream through the pipeline.
this->UpdateTimeRequest->Set(vtkExecutive::FORWARD_DIRECTION(), vtkExecutive::RequestUpstream);
// Algorithms process this request before it is forwarded.
this->UpdateTimeRequest->Set(vtkExecutive::ALGORITHM_BEFORE_FORWARD(), 1);
}
this->UpdateTimeRequest->Set(FROM_OUTPUT_PORT(), outputPort);
// Send the request.
return this->ProcessRequest(
this->UpdateTimeRequest, this->GetInputInformation(), this->GetOutputInformation());
}
UpdateTimeRequest
vtkInformation* UpdateTimeRequest
FROM_OUTPUT_PORT()
信息键,用于存储发出请求的输出端口号。
/**
* Information key to store the output port number from which a
* request is made.
*/
static vtkInformationIntegerKey* FROM_OUTPUT_PORT();
this->ProcessRequest
见2.2节。
此场景的ProcessRequest()调用流程如下图所示:
PropagateTime()中ProcessRequest()的调用流程
2.1.2 this->UpdateTimeDependentInformation(port);
UpdateTimeDependentInformation()参见2.4节。
UpdateTimeDependentInformation()函数本质上也是调用2.2节的ProcessRequest()方法:
this->ProcessRequest(this->TimeDependentInformationRequest, this->GetInputInformation(),
this->GetOutputInformation());
与2.1.1 this->PropagateTime(port) 不同的是,2.1.2 this->UpdateTimeDependentInformation(port) :
- UpdateTimeDependentInformation传入的第一个参数为TimeDependentInformationRequest
- PropagateTime传入的第一个参数为UpdateTimeRequest
2.1.3 this->PropagateUpdateExtent(port);
跟2.1.1和2.1.2相同,本质上都是调用2.2的ProcessRequest()方法
this->ProcessRequest(
this->UpdateExtentRequest, this->GetInputInformation(), this->GetOutputInformation());
不同的是:
第一个参数为:UpdateExtentRequest
vtkInformation* UpdateExtentRequest;
2.1.4 this->UpdateData(port);
UpdateData代码实现参见2.6 UpdateData(int outputPort),本质上也是调用2.2节的ProcessRequest()方法。
this->ProcessRequest(
this->DataRequest, this->GetInputInformation(), this->GetOutputInformation());
2.2 ProcessRequest()
vtkTypeBool vtkStreamingDemandDrivenPipeline ::ProcessRequest(
vtkInformation* request, vtkInformationVector** inInfoVec, vtkInformationVector* outInfoVec)
{
// The algorithm should not invoke anything on the executive.
if (!this->CheckAlgorithm("ProcessRequest", request))
{
return 0;
}
// Look for specially supported requests.
if (request->Has(REQUEST_UPDATE_TIME()))
{
int result = 1;
int outputPort = -1;
if (request->Has(FROM_OUTPUT_PORT()))
{
outputPort = request->Get(FROM_OUTPUT_PORT());
}
int N2E = this->Superclass::NeedToExecuteData(outputPort, inInfoVec, outInfoVec);
if (!N2E && outputPort >= 0)
{
vtkInformation* outInfo = outInfoVec->GetInformationObject(outputPort);
vtkDataObject* dataObject = outInfo->Get(vtkDataObject::DATA_OBJECT());
if (outInfo->Has(TIME_DEPENDENT_INFORMATION()))
{
N2E = this->NeedToExecuteBasedOnTime(outInfo, dataObject);
}
else
{
N2E = 0;
}
}
if (N2E)
{
vtkLogF(TRACE, "%s execute-update-time", vtkLogIdentifier(this->Algorithm));
result = this->CallAlgorithm(request, vtkExecutive::RequestUpstream, inInfoVec, outInfoVec);
// Propagate the update extent to all inputs.
if (result)
{
result = this->ForwardUpstream(request);
}
result = 1;
}
return result;
}
outputPort = request->Get(FROM_OUTPUT_PORT())
此处的outputPort值大多数时间为0。
2.2.1 this->Superclass::NeedToExecuteData
class VTKCOMMONEXECUTIONMODEL_EXPORT vtkStreamingDemandDrivenPipeline
: public vtkDemandDrivenPipeline
vtkStreamingDemandDrivenPipeline类的父类是vtkDemandDrivenPipeline,因此this->Superclass::NeedToExecuteData实质上调用的代码为:
int vtkDemandDrivenPipeline ::NeedToExecuteData(
int outputPort, vtkInformationVector** inInfoVec, vtkInformationVector* outInfoVec)
{
// If the filter parameters or input have been modified since the
// last execution then we must execute. This is a shortcut for most
// filters since all outputs will have the same UpdateTime. This
// also handles the case in which there are no outputs.
//如果自上次执行以来过滤器参数或输入已被修改,则我们必须执行。这是大多数过滤器的快捷方式,因为所有输出都有相同的更新时间。这也适用于没有输出的情况。
if (this->PipelineMTime > this->DataTime.GetMTime())
{
return 1;
}
if (outputPort >= 0)
{
// If the output on the port making the request is out-of-date
// then we must execute.
vtkInformation* info = outInfoVec->GetInformationObject(outputPort);
vtkDataObject* data = info->Get(vtkDataObject::DATA_OBJECT());
if (!data || this->PipelineMTime > data->GetUpdateTime())
{
return 1;
}
}
else
{
// No port is specified. Check all ports.
for (int i = 0; i < this->Algorithm->GetNumberOfOutputPorts(); ++i)
{
if (this->NeedToExecuteData(i, inInfoVec, outInfoVec))
{
return 1;
}
}
}
// We do not need to execute.
return 0;
}
outputPort为0
因为参数outputPort为0,所以 int vtkDemandDrivenPipeline ::NeedToExecuteData()中实际调用的代码为:
if (outputPort >= 0)
{
// If the output on the port making the request is out-of-date
// then we must execute.
vtkInformation* info = outInfoVec->GetInformationObject(outputPort);
vtkDataObject* data = info->Get(vtkDataObject::DATA_OBJECT());
if (!data || this->PipelineMTime > data->GetUpdateTime())
{
return 1;
}
}
outputPort不为0
// No port is specified. Check all ports.
for (int i = 0; i < this->Algorithm->GetNumberOfOutputPorts(); ++i)
{
if (this->NeedToExecuteData(i, inInfoVec, outInfoVec))
{
return 1;
}
}
循环递归调用NeedToExecuteData()方法。
this->Algorithm为:
参考vtkStreamingDemandDrivenPipeline类的协作图,可以知道vtkStreamingDemandDrivenPipeline类的Algorithm变量继承子其父类vtkDemandDrivenPipeline类的父类vtkExecutive类的Algorithm变量,
该变量定义如下:
// The algorithm managed by this executive.这个执行程序管理的算法。
vtkAlgorithm* Algorithm;
2.2.2 vtkInformation* outInfo = outInfoVec->GetInformationObject(outputPort);
此处的outInforVec来自vtkStreamingDemandDrivenPipeline ::ProcessRequest()方法的第四个参数 vtkInformationVector* outInfoVec。
在vtkStreamingDemandDrivenPipeline::UpdateTimeDependentInformation(int port)中有调用ProcessRequest()语句:
// Send the request.
return this->ProcessRequest(this->TimeDependentInformationRequest, this->GetInputInformation(),
this->GetOutputInformation());
此处ProcessRequest()方法的第四个参数为: this->GetOutputInformation()。
根据vtkStreamingDemandDrivenPipeline类的协作图可以知道, this->GetOutputInformation()应该调用vtkStreamingDemandDrivenPipeline类的祖先类vtkExecutive类的GetOutputInformation():
参考Paraview源码解析6:vtkExecutive类可以知道vtkExecutive类的GetOutputInformation()返回OutputInformation变量,该变量为算法的每个输出端口存储一个信息对象vtkInformation。
vtkInformation* vtkInformationVector::GetInformationObject(int index)
{
if (index >= 0 && index < this->NumberOfInformationObjects)
{
return this->Internal->Vector[index];
}
return nullptr;
}
** this->Internal**
// Internal implementation details.
vtkInformationVectorInternals* Internal;
class vtkInformationVectorInternals
{
public:
std::vector<vtkInformation*> Vector;
~vtkInformationVectorInternals();
};
//------------------------------------------------------------------------------
vtkInformationVectorInternals::~vtkInformationVectorInternals()
{
// Delete all the information objects.
for (std::vector<vtkInformation*>::iterator i = this->Vector.begin(); i != this->Vector.end();
++i)
{
if (vtkInformation* info = *i)
{
info->Delete();
}
}
}
2.2.3 N2E = this->NeedToExecuteBasedOnTime(outInfo, dataObject);
见2.3节
2.2.4 result = this->CallAlgorithm(request, vtkExecutive::RequestUpstream, inInfoVec, outInfoVec);
CallAlgorithm()是核心代码
int vtkExecutive::CallAlgorithm(vtkInformation* request, int direction,
vtkInformationVector** inInfo, vtkInformationVector* outInfo)
{
// Copy default information in the direction of information flow.
this->CopyDefaultInformation(request, direction, inInfo, outInfo);
// Invoke the request on the algorithm.
this->InAlgorithm = 1;
int result = this->Algorithm->ProcessRequest(request, inInfo, outInfo);
this->InAlgorithm = 0;
// If the algorithm failed report it now.
if (!result)
{
vtkErrorMacro("Algorithm " << this->Algorithm->GetClassName() << "(" << this->Algorithm
<< ") returned failure for request: " << *request);
}
return result;
}
参数request:
// Setup the request for update extent propagation.
if (!this->UpdateTimeRequest)
{
this->UpdateTimeRequest = vtkInformation::New();
this->UpdateTimeRequest->Set(REQUEST_UPDATE_TIME());
// The request is forwarded upstream through the pipeline.
this->UpdateTimeRequest->Set(vtkExecutive::FORWARD_DIRECTION(), vtkExecutive::RequestUpstream);
// Algorithms process this request before it is forwarded.
this->UpdateTimeRequest->Set(vtkExecutive::ALGORITHM_BEFORE_FORWARD(), 1);
}
this->UpdateTimeRequest->Set(FROM_OUTPUT_PORT(), outputPort);
参数inInofo
参数inInofo其实是
vtkTypeBool vtkStreamingDemandDrivenPipeline ::ProcessRequest(
vtkInformation* request, vtkInformationVector** inInfoVec, vtkInformationVector* outInfoVec)的第个个参数vtkInformationVector** inInfoVec。
ProcessRequest()方法在vtkStreamingDemandDrivenPipeline::PropagateTime(int outputPort)中被调用:
this->ProcessRequest(
this->UpdateTimeRequest, this->GetInputInformation(), this->GetOutputInformation());
因此参数inInfo的值实际上是:
this->GetInputInformation()
参数outInfo:
同inInfo类似,outInfo的参数值实际上是:
this->GetOutputInformation()
GetInputInformation()和GetOutputInformation()都是vtkStreamingDemandDrivenPipeline类的祖先类vtkExcutive类的方法。
具体实现参见:Paraview源码解析6:vtkExecutive类
int result = this->Algorithm->ProcessRequest(request, inInfo, outInfo);
Algorithm变量定义如下:
vtkAlgorithm* Algorithm;
vtkAlgortithm::ProcessRequest()方法具体实现参见:Paraview源码解析3:vtkAlgorithm类2.1节。
vtkAlgorithm类的ProcessRequest()是一个虚函数,每个子类会对该方法进行重写。
vtkArrowSource类中的update()方法调用vtkPolyDataAlgorithm::ProcessRequest()方法。
vtkPolyDataAlgorithm::ProcessRequest()具体实现参见:Paraview源码解析5:vtkPolyDataAlgorithm类中的2.1节。
因为此处参数request没有:
- vtkDemandDrivenPipeline::REQUEST_INFORMATION()
- vtkStreamingDemandDrivenPipeline::REQUEST_UPDATE_EXTENT()
- vtkDemandDrivenPipeline::REQUEST_DATA()
vtkPolyDataAlgorithm::ProcessRequest()中实际上只是调用:
return this->Superclass::ProcessRequest(request, inputVector, outputVector);
即调用Paraview源码解析3:vtkAlgorithm类2.2节中的ProcessRequest()方法。
2.2.5 result = this->ForwardUpstream(request);
参见2.7 ForwardUpstream()方法
2.3 NeedToExecuteBasedOnTime()
int vtkStreamingDemandDrivenPipeline::NeedToExecuteBasedOnTime(
vtkInformation* outInfo, vtkDataObject* dataObject)
{
// If this algorithm does not provide time information and another
// algorithm upstream did not provide time information, we do not
// re-execute even if the time request changed.
if (!outInfo->Has(TIME_RANGE()))
{
return 0;
}
vtkInformation* dataInfo = dataObject->GetInformation();
// if we are requesting a particular update time index, check
// if we have the desired time index.
if (outInfo->Has(UPDATE_TIME_STEP()))
{
if (!dataInfo->Has(vtkDataObject::DATA_TIME_STEP()))
{
return 1;
}
double ustep = outInfo->Get(UPDATE_TIME_STEP());
// First check if time request is the same as previous time request.
// If the previous update request did not correspond to an existing
// time step and the reader chose a time step with it's own logic, the
// data time step will be different than the request. If the same time
// step is requested again, there is no need to re-execute the
// algorithm. We know that it does not have this time step.
if (outInfo->Has(PREVIOUS_UPDATE_TIME_STEP()))
{
if (outInfo->Has(UPDATE_TIME_STEP()))
{
bool match = true;
double pstep = outInfo->Get(PREVIOUS_UPDATE_TIME_STEP());
if (pstep != ustep)
{
match = false;
}
if (match)
{
return 0;
}
}
}
int hasdsteps = dataInfo->Has(vtkDataObject::DATA_TIME_STEP());
int hasusteps = dataInfo->Has(UPDATE_TIME_STEP());
double dstep = dataInfo->Get(vtkDataObject::DATA_TIME_STEP());
if ((hasdsteps && !hasusteps) || (!hasdsteps && hasusteps))
{
return 1;
}
if (dstep != ustep)
{
return 1;
}
}
return 0;
}
2.4 UpdateTimeDependentInformation
int vtkStreamingDemandDrivenPipeline::UpdateTimeDependentInformation(int port)
{
// The algorithm should not invoke anything on the executive.
if (!this->CheckAlgorithm("UpdateMetaInformation", nullptr))
{
return 0;
}
// Setup the request for information.
if (!this->TimeDependentInformationRequest)
{
this->TimeDependentInformationRequest = vtkInformation::New();
this->TimeDependentInformationRequest->Set(REQUEST_TIME_DEPENDENT_INFORMATION());
// The request is forwarded upstream through the pipeline.
this->TimeDependentInformationRequest->Set(
vtkExecutive::FORWARD_DIRECTION(), vtkExecutive::RequestUpstream);
// Algorithms process this request after it is forwarded.
this->TimeDependentInformationRequest->Set(vtkExecutive::ALGORITHM_AFTER_FORWARD(), 1);
}
this->TimeDependentInformationRequest->Set(FROM_OUTPUT_PORT(), port);
// Send the request.
return this->ProcessRequest(this->TimeDependentInformationRequest, this->GetInputInformation(),
this->GetOutputInformation());
}
2.5 PropagateUpdateExtent
int vtkStreamingDemandDrivenPipeline::PropagateUpdateExtent(int outputPort)
{
// The algorithm should not invoke anything on the executive.
if (!this->CheckAlgorithm("PropagateUpdateExtent", nullptr))
{
return 0;
}
// Range check.
if (outputPort < -1 || outputPort >= this->Algorithm->GetNumberOfOutputPorts())
{
vtkErrorMacro("PropagateUpdateExtent given output port index "
<< outputPort << " on an algorithm with " << this->Algorithm->GetNumberOfOutputPorts()
<< " output ports.");
return 0;
}
// Setup the request for update extent propagation.
if (!this->UpdateExtentRequest)
{
this->UpdateExtentRequest = vtkInformation::New();
this->UpdateExtentRequest->Set(REQUEST_UPDATE_EXTENT());
// The request is forwarded upstream through the pipeline.
this->UpdateExtentRequest->Set(
vtkExecutive::FORWARD_DIRECTION(), vtkExecutive::RequestUpstream);
// Algorithms process this request before it is forwarded.
this->UpdateExtentRequest->Set(vtkExecutive::ALGORITHM_BEFORE_FORWARD(), 1);
}
this->UpdateExtentRequest->Set(FROM_OUTPUT_PORT(), outputPort);
// Send the request.
return this->ProcessRequest(
this->UpdateExtentRequest, this->GetInputInformation(), this->GetOutputInformation());
}
2.6 UpdateData(int outputPort)
int vtkDemandDrivenPipeline::UpdateData(int outputPort)
{
// The algorithm should not invoke anything on the executive.
if (!this->CheckAlgorithm("UpdateData", nullptr))
{
return 0;
}
// Range check.
if (outputPort < -1 || outputPort >= this->Algorithm->GetNumberOfOutputPorts())
{
vtkErrorMacro("UpdateData given output port index " << outputPort << " on an algorithm with "
<< this->Algorithm->GetNumberOfOutputPorts()
<< " output ports.");
return 0;
}
// Setup the request for data.
if (!this->DataRequest)
{
this->DataRequest = vtkInformation::New();
this->DataRequest->Set(REQUEST_DATA());
// The request is forwarded upstream through the pipeline.
this->DataRequest->Set(vtkExecutive::FORWARD_DIRECTION(), vtkExecutive::RequestUpstream);
// Algorithms process this request after it is forwarded.
this->DataRequest->Set(vtkExecutive::ALGORITHM_AFTER_FORWARD(), 1);
}
// Send the request.
this->DataRequest->Set(FROM_OUTPUT_PORT(), outputPort);
return this->ProcessRequest(
this->DataRequest, this->GetInputInformation(), this->GetOutputInformation());
}
2.7 ForwardUpstream(vtkInformation* request)
int vtkExecutive::ForwardUpstream(vtkInformation* request)
{
// Do not forward upstream if the input is shared with another
// executive.
if (this->SharedInputInformation)
{
return 1;
}
if (!this->Algorithm->ModifyRequest(request, BeforeForward))
{
return 0;
}
// Forward the request upstream through all input connections.
int result = 1;
for (int i = 0; i < this->GetNumberOfInputPorts(); ++i)
{
int nic = this->Algorithm->GetNumberOfInputConnections(i);
vtkInformationVector* inVector = this->GetInputInformation()[i];
for (int j = 0; j < nic; ++j)
{
vtkInformation* info = inVector->GetInformationObject(j);
// Get the executive producing this input. If there is none, then
// it is a nullptr input.
vtkExecutive* e;
int producerPort;
vtkExecutive::PRODUCER()->Get(info, e, producerPort);
if (e)
{
int port = request->Get(FROM_OUTPUT_PORT());
request->Set(FROM_OUTPUT_PORT(), producerPort);
if (!e->ProcessRequest(request, e->GetInputInformation(), e->GetOutputInformation()))
{
result = 0;
}
request->Set(FROM_OUTPUT_PORT(), port);
}
}
}
if (!this->Algorithm->ModifyRequest(request, AfterForward))
{
return 0;
}
return result;
}
暂无评论内容