Paraview源码解析4:vtkStreamingDemandDrivenPipeline类

vtkStreamingDemandDrivenPipeline是只支持更新管道中部分数据集的执行程序。这是老式VTK 4.x管道提供的管道更新样式。该执行器不总是更新整个数据集,而是支持请求数据块或子数据块。

1.类图结构

vtkStreamingDemandDrivenPipeline类的继承关系图:
在这里插入图片描述
vtkStreamingDemandDrivenPipeline类的协作图:
在这里插入图片描述

vtkStreamingDemandDrivenPipeline Class Reference

2.实现代码

2.1Update()

//------------------------------------------------------------------------------
vtkTypeBool vtkStreamingDemandDrivenPipeline::Update(int port)
{
  return this->Update(port, nullptr);
}

//------------------------------------------------------------------------------
vtkTypeBool vtkStreamingDemandDrivenPipeline::Update(int port, vtkInformationVector* requests)
{
  if (!this->UpdateInformation())
  {
    return 0;
  }
  int numPorts = this->Algorithm->GetNumberOfOutputPorts();
  if (requests)
  {
    vtkInformationVector* outInfoVec = this->GetOutputInformation();
    for (int i = 0; i < numPorts; i++)
    {
      vtkInformation* outInfo = outInfoVec->GetInformationObject(i);
      vtkInformation* req = requests->GetInformationObject(i);
      if (outInfo && req)
      {
        outInfo->Append(req);
      }
    }
  }

  if (port >= -1 && port < numPorts)
  {
    int retval = 1;
    // some streaming filters can request that the pipeline execute multiple
    // times for a single update
    //一些流式过滤器可以请求管道为一次更新执行多次
    do
    {
      this->PropagateTime(port);
      this->UpdateTimeDependentInformation(port);
      retval = retval && this->PropagateUpdateExtent(port);
      if (retval && !this->LastPropogateUpdateExtentShortCircuited)
      {
        retval = retval && this->UpdateData(port);
      }
    } while (this->ContinueExecuting);
    return retval;
  }
  else
  {
    return 1;
  }
}

因为requests参数为nullputr,所以此处实际上执行代码为:

 do
    {
      this->PropagateTime(port);
      this->UpdateTimeDependentInformation(port);
      retval = retval && this->PropagateUpdateExtent(port);
      if (retval && !this->LastPropogateUpdateExtentShortCircuited)
      {
        retval = retval && this->UpdateData(port);
      }
    } while (this->ContinueExecuting);

2.1.1 this->PropagateTime(port)

int vtkStreamingDemandDrivenPipeline::PropagateTime(int outputPort)
{
  // The algorithm should not invoke anything on the executive.
  //算法不应该调用任何关于执行程序的内容。
  if (!this->CheckAlgorithm("PropagateTime", nullptr))
  {
    return 0;
  }

  // Range check.
  if (outputPort < -1 || outputPort >= this->Algorithm->GetNumberOfOutputPorts())
  {
    vtkErrorMacro("PropagateUpdateTime given output port index "
      << outputPort << " on an algorithm with " << this->Algorithm->GetNumberOfOutputPorts()
      << " output ports.");
    return 0;
  }

  // Setup the request for update extent propagation.
  if (!this->UpdateTimeRequest)
  {
    this->UpdateTimeRequest = vtkInformation::New();
    this->UpdateTimeRequest->Set(REQUEST_UPDATE_TIME());
    // The request is forwarded upstream through the pipeline.
    this->UpdateTimeRequest->Set(vtkExecutive::FORWARD_DIRECTION(), vtkExecutive::RequestUpstream);
    // Algorithms process this request before it is forwarded.
    this->UpdateTimeRequest->Set(vtkExecutive::ALGORITHM_BEFORE_FORWARD(), 1);
  }

  this->UpdateTimeRequest->Set(FROM_OUTPUT_PORT(), outputPort);

  // Send the request.
  return this->ProcessRequest(
    this->UpdateTimeRequest, this->GetInputInformation(), this->GetOutputInformation());
}

UpdateTimeRequest

vtkInformation* UpdateTimeRequest

FROM_OUTPUT_PORT()
信息键,用于存储发出请求的输出端口号。

  /**
   * Information key to store the output port number from which a
   * request is made.
   */
  static vtkInformationIntegerKey* FROM_OUTPUT_PORT();

this->ProcessRequest
见2.2节。
此场景的ProcessRequest()调用流程如下图所示:
在这里插入图片描述

PropagateTime()中ProcessRequest()的调用流程

2.1.2 this->UpdateTimeDependentInformation(port);

UpdateTimeDependentInformation()参见2.4节。
UpdateTimeDependentInformation()函数本质上也是调用2.2节的ProcessRequest()方法:

this->ProcessRequest(this->TimeDependentInformationRequest, this->GetInputInformation(),
    this->GetOutputInformation());

2.1.1 this->PropagateTime(port) 不同的是,2.1.2 this->UpdateTimeDependentInformation(port)

  • UpdateTimeDependentInformation传入的第一个参数为TimeDependentInformationRequest
  • PropagateTime传入的第一个参数为UpdateTimeRequest

2.1.3 this->PropagateUpdateExtent(port);

跟2.1.1和2.1.2相同,本质上都是调用2.2的ProcessRequest()方法

this->ProcessRequest(
    this->UpdateExtentRequest, this->GetInputInformation(), this->GetOutputInformation());

不同的是:
第一个参数为:UpdateExtentRequest

 vtkInformation* UpdateExtentRequest;

2.1.4 this->UpdateData(port);

UpdateData代码实现参见2.6 UpdateData(int outputPort),本质上也是调用2.2节的ProcessRequest()方法。

this->ProcessRequest(
    this->DataRequest, this->GetInputInformation(), this->GetOutputInformation());

2.2 ProcessRequest()

vtkTypeBool vtkStreamingDemandDrivenPipeline ::ProcessRequest(
  vtkInformation* request, vtkInformationVector** inInfoVec, vtkInformationVector* outInfoVec)
{
  // The algorithm should not invoke anything on the executive.
  if (!this->CheckAlgorithm("ProcessRequest", request))
  {
    return 0;
  }

  // Look for specially supported requests.
  if (request->Has(REQUEST_UPDATE_TIME()))
  {
    int result = 1;
    int outputPort = -1;
    if (request->Has(FROM_OUTPUT_PORT()))
    {
      outputPort = request->Get(FROM_OUTPUT_PORT());
    }

    int N2E = this->Superclass::NeedToExecuteData(outputPort, inInfoVec, outInfoVec);
    if (!N2E && outputPort >= 0)
    {
      vtkInformation* outInfo = outInfoVec->GetInformationObject(outputPort);
      vtkDataObject* dataObject = outInfo->Get(vtkDataObject::DATA_OBJECT());
      if (outInfo->Has(TIME_DEPENDENT_INFORMATION()))
      {
        N2E = this->NeedToExecuteBasedOnTime(outInfo, dataObject);
      }
      else
      {
        N2E = 0;
      }
    }
    if (N2E)
    {
      vtkLogF(TRACE, "%s execute-update-time", vtkLogIdentifier(this->Algorithm));
      result = this->CallAlgorithm(request, vtkExecutive::RequestUpstream, inInfoVec, outInfoVec);
      // Propagate the update extent to all inputs.
      if (result)
      {
        result = this->ForwardUpstream(request);
      }
      result = 1;
    }
    return result;
  }

outputPort = request->Get(FROM_OUTPUT_PORT())
此处的outputPort值大多数时间为0。

2.2.1 this->Superclass::NeedToExecuteData

class VTKCOMMONEXECUTIONMODEL_EXPORT vtkStreamingDemandDrivenPipeline
 : public vtkDemandDrivenPipeline

vtkStreamingDemandDrivenPipeline类的父类是vtkDemandDrivenPipeline,因此this->Superclass::NeedToExecuteData实质上调用的代码为:

int vtkDemandDrivenPipeline ::NeedToExecuteData(
 int outputPort, vtkInformationVector** inInfoVec, vtkInformationVector* outInfoVec)
{
 // If the filter parameters or input have been modified since the
 // last execution then we must execute.  This is a shortcut for most
 // filters since all outputs will have the same UpdateTime.  This
 // also handles the case in which there are no outputs.
 //如果自上次执行以来过滤器参数或输入已被修改,则我们必须执行。这是大多数过滤器的快捷方式,因为所有输出都有相同的更新时间。这也适用于没有输出的情况。
 if (this->PipelineMTime > this->DataTime.GetMTime())
 {
   return 1;
 }

 if (outputPort >= 0)
 {
   // If the output on the port making the request is out-of-date
   // then we must execute.
   vtkInformation* info = outInfoVec->GetInformationObject(outputPort);
   vtkDataObject* data = info->Get(vtkDataObject::DATA_OBJECT());
   if (!data || this->PipelineMTime > data->GetUpdateTime())
   {
     return 1;
   }
 }
 else
 {
   // No port is specified.  Check all ports.
   for (int i = 0; i < this->Algorithm->GetNumberOfOutputPorts(); ++i)
   {
     if (this->NeedToExecuteData(i, inInfoVec, outInfoVec))
     {
       return 1;
     }
   }
 }

 // We do not need to execute.
 return 0;
}

outputPort为0
因为参数outputPort为0,所以 int vtkDemandDrivenPipeline ::NeedToExecuteData()中实际调用的代码为:

  if (outputPort >= 0)
 {
   // If the output on the port making the request is out-of-date
   // then we must execute.
   vtkInformation* info = outInfoVec->GetInformationObject(outputPort);
   vtkDataObject* data = info->Get(vtkDataObject::DATA_OBJECT());
   if (!data || this->PipelineMTime > data->GetUpdateTime())
   {
     return 1;
   }
 }

outputPort不为0

 // No port is specified.  Check all ports.
   for (int i = 0; i < this->Algorithm->GetNumberOfOutputPorts(); ++i)
   {
     if (this->NeedToExecuteData(i, inInfoVec, outInfoVec))
     {
       return 1;
     }
   }

循环递归调用NeedToExecuteData()方法。
this->Algorithm为:
参考vtkStreamingDemandDrivenPipeline类的协作图,可以知道vtkStreamingDemandDrivenPipeline类的Algorithm变量继承子其父类vtkDemandDrivenPipeline类的父类vtkExecutive类的Algorithm变量,
该变量定义如下:

  // The algorithm managed by this executive.这个执行程序管理的算法。
vtkAlgorithm* Algorithm;

2.2.2 vtkInformation* outInfo = outInfoVec->GetInformationObject(outputPort);

此处的outInforVec来自vtkStreamingDemandDrivenPipeline ::ProcessRequest()方法的第四个参数 vtkInformationVector* outInfoVec。
在vtkStreamingDemandDrivenPipeline::UpdateTimeDependentInformation(int port)中有调用ProcessRequest()语句:

// Send the request.
 return this->ProcessRequest(this->TimeDependentInformationRequest, this->GetInputInformation(),
   this->GetOutputInformation());

此处ProcessRequest()方法的第四个参数为: this->GetOutputInformation()。
根据vtkStreamingDemandDrivenPipeline类的协作图可以知道, this->GetOutputInformation()应该调用vtkStreamingDemandDrivenPipeline类的祖先类vtkExecutive类的GetOutputInformation():
参考Paraview源码解析6:vtkExecutive类可以知道vtkExecutive类的GetOutputInformation()返回OutputInformation变量,该变量为算法的每个输出端口存储一个信息对象vtkInformation。

vtkInformation* vtkInformationVector::GetInformationObject(int index)
{
 if (index >= 0 && index < this->NumberOfInformationObjects)
 {
   return this->Internal->Vector[index];
 }
 return nullptr;
}

** this->Internal**

  // Internal implementation details.
 vtkInformationVectorInternals* Internal;
class vtkInformationVectorInternals
{
public:
 std::vector<vtkInformation*> Vector;

 ~vtkInformationVectorInternals();
};

//------------------------------------------------------------------------------
vtkInformationVectorInternals::~vtkInformationVectorInternals()
{
 // Delete all the information objects.
 for (std::vector<vtkInformation*>::iterator i = this->Vector.begin(); i != this->Vector.end();
      ++i)
 {
   if (vtkInformation* info = *i)
   {
     info->Delete();
   }
 }
}

2.2.3 N2E = this->NeedToExecuteBasedOnTime(outInfo, dataObject);

见2.3节

2.2.4 result = this->CallAlgorithm(request, vtkExecutive::RequestUpstream, inInfoVec, outInfoVec);

CallAlgorithm()是核心代码

int vtkExecutive::CallAlgorithm(vtkInformation* request, int direction,
  vtkInformationVector** inInfo, vtkInformationVector* outInfo)
{
  // Copy default information in the direction of information flow.
  this->CopyDefaultInformation(request, direction, inInfo, outInfo);

  // Invoke the request on the algorithm.
  this->InAlgorithm = 1;
  int result = this->Algorithm->ProcessRequest(request, inInfo, outInfo);
  this->InAlgorithm = 0;

  // If the algorithm failed report it now.
  if (!result)
  {
    vtkErrorMacro("Algorithm " << this->Algorithm->GetClassName() << "(" << this->Algorithm
                               << ") returned failure for request: " << *request);
  }

  return result;
}

参数request:

 // Setup the request for update extent propagation.
  if (!this->UpdateTimeRequest)
  {
    this->UpdateTimeRequest = vtkInformation::New();
    this->UpdateTimeRequest->Set(REQUEST_UPDATE_TIME());
    // The request is forwarded upstream through the pipeline.
    this->UpdateTimeRequest->Set(vtkExecutive::FORWARD_DIRECTION(), vtkExecutive::RequestUpstream);
    // Algorithms process this request before it is forwarded.
    this->UpdateTimeRequest->Set(vtkExecutive::ALGORITHM_BEFORE_FORWARD(), 1);
  }

  this->UpdateTimeRequest->Set(FROM_OUTPUT_PORT(), outputPort);

参数inInofo
参数inInofo其实是
vtkTypeBool vtkStreamingDemandDrivenPipeline ::ProcessRequest(
vtkInformation* request, vtkInformationVector** inInfoVec, vtkInformationVector* outInfoVec)的第个个参数vtkInformationVector** inInfoVec。

ProcessRequest()方法在vtkStreamingDemandDrivenPipeline::PropagateTime(int outputPort)中被调用:

this->ProcessRequest(
    this->UpdateTimeRequest, this->GetInputInformation(), this->GetOutputInformation());

因此参数inInfo的值实际上是:

this->GetInputInformation()

参数outInfo:
同inInfo类似,outInfo的参数值实际上是:

this->GetOutputInformation()

GetInputInformation()和GetOutputInformation()都是vtkStreamingDemandDrivenPipeline类的祖先类vtkExcutive类的方法。
具体实现参见:Paraview源码解析6:vtkExecutive类

int result = this->Algorithm->ProcessRequest(request, inInfo, outInfo);
Algorithm变量定义如下:

vtkAlgorithm* Algorithm;

vtkAlgortithm::ProcessRequest()方法具体实现参见:Paraview源码解析3:vtkAlgorithm类2.1节。
vtkAlgorithm类的ProcessRequest()是一个虚函数,每个子类会对该方法进行重写。
vtkArrowSource类中的update()方法调用vtkPolyDataAlgorithm::ProcessRequest()方法。

vtkPolyDataAlgorithm::ProcessRequest()具体实现参见:Paraview源码解析5:vtkPolyDataAlgorithm类中的2.1节。
因为此处参数request没有:

  • vtkDemandDrivenPipeline::REQUEST_INFORMATION()
  • vtkStreamingDemandDrivenPipeline::REQUEST_UPDATE_EXTENT()
  • vtkDemandDrivenPipeline::REQUEST_DATA()

vtkPolyDataAlgorithm::ProcessRequest()中实际上只是调用:

 return this->Superclass::ProcessRequest(request, inputVector, outputVector);

即调用Paraview源码解析3:vtkAlgorithm类2.2节中的ProcessRequest()方法。

2.2.5 result = this->ForwardUpstream(request);

参见2.7 ForwardUpstream()方法

2.3 NeedToExecuteBasedOnTime()

int vtkStreamingDemandDrivenPipeline::NeedToExecuteBasedOnTime(
vtkInformation* outInfo, vtkDataObject* dataObject)
{
// If this algorithm does not provide time information and another
// algorithm upstream did not provide time information, we do not
// re-execute even if the time request changed.
if (!outInfo->Has(TIME_RANGE()))
{
  return 0;
}

vtkInformation* dataInfo = dataObject->GetInformation();
// if we are requesting a particular update time index, check
// if we have the desired time index.
if (outInfo->Has(UPDATE_TIME_STEP()))
{
  if (!dataInfo->Has(vtkDataObject::DATA_TIME_STEP()))
  {
    return 1;
  }

  double ustep = outInfo->Get(UPDATE_TIME_STEP());

  // First check if time request is the same as previous time request.
  // If the previous update request did not correspond to an existing
  // time step and the reader chose a time step with it's own logic, the
  // data time step will be different than the request. If the same time
  // step is requested again, there is no need to re-execute the
  // algorithm.  We know that it does not have this time step.
  if (outInfo->Has(PREVIOUS_UPDATE_TIME_STEP()))
  {
    if (outInfo->Has(UPDATE_TIME_STEP()))
    {
      bool match = true;
      double pstep = outInfo->Get(PREVIOUS_UPDATE_TIME_STEP());
      if (pstep != ustep)
      {
        match = false;
      }
      if (match)
      {
        return 0;
      }
    }
  }

  int hasdsteps = dataInfo->Has(vtkDataObject::DATA_TIME_STEP());
  int hasusteps = dataInfo->Has(UPDATE_TIME_STEP());

  double dstep = dataInfo->Get(vtkDataObject::DATA_TIME_STEP());
  if ((hasdsteps && !hasusteps) || (!hasdsteps && hasusteps))
  {
    return 1;
  }
  if (dstep != ustep)
  {
    return 1;
  }
}
return 0;
}

2.4 UpdateTimeDependentInformation

int vtkStreamingDemandDrivenPipeline::UpdateTimeDependentInformation(int port)
{
 // The algorithm should not invoke anything on the executive.
 if (!this->CheckAlgorithm("UpdateMetaInformation", nullptr))
 {
   return 0;
 }

 // Setup the request for information.
 if (!this->TimeDependentInformationRequest)
 {
   this->TimeDependentInformationRequest = vtkInformation::New();
   this->TimeDependentInformationRequest->Set(REQUEST_TIME_DEPENDENT_INFORMATION());
   // The request is forwarded upstream through the pipeline.
   this->TimeDependentInformationRequest->Set(
     vtkExecutive::FORWARD_DIRECTION(), vtkExecutive::RequestUpstream);
   // Algorithms process this request after it is forwarded.
   this->TimeDependentInformationRequest->Set(vtkExecutive::ALGORITHM_AFTER_FORWARD(), 1);
 }

 this->TimeDependentInformationRequest->Set(FROM_OUTPUT_PORT(), port);

 // Send the request.
 return this->ProcessRequest(this->TimeDependentInformationRequest, this->GetInputInformation(),
   this->GetOutputInformation());
}

2.5 PropagateUpdateExtent

int vtkStreamingDemandDrivenPipeline::PropagateUpdateExtent(int outputPort)
{
 // The algorithm should not invoke anything on the executive.
 if (!this->CheckAlgorithm("PropagateUpdateExtent", nullptr))
 {
   return 0;
 }

 // Range check.
 if (outputPort < -1 || outputPort >= this->Algorithm->GetNumberOfOutputPorts())
 {
   vtkErrorMacro("PropagateUpdateExtent given output port index "
     << outputPort << " on an algorithm with " << this->Algorithm->GetNumberOfOutputPorts()
     << " output ports.");
   return 0;
 }

 // Setup the request for update extent propagation.
 if (!this->UpdateExtentRequest)
 {
   this->UpdateExtentRequest = vtkInformation::New();
   this->UpdateExtentRequest->Set(REQUEST_UPDATE_EXTENT());
   // The request is forwarded upstream through the pipeline.
   this->UpdateExtentRequest->Set(
     vtkExecutive::FORWARD_DIRECTION(), vtkExecutive::RequestUpstream);
   // Algorithms process this request before it is forwarded.
   this->UpdateExtentRequest->Set(vtkExecutive::ALGORITHM_BEFORE_FORWARD(), 1);
 }

 this->UpdateExtentRequest->Set(FROM_OUTPUT_PORT(), outputPort);

 // Send the request.
 return this->ProcessRequest(
   this->UpdateExtentRequest, this->GetInputInformation(), this->GetOutputInformation());
}

2.6 UpdateData(int outputPort)

int vtkDemandDrivenPipeline::UpdateData(int outputPort)
{
 // The algorithm should not invoke anything on the executive.
 if (!this->CheckAlgorithm("UpdateData", nullptr))
 {
   return 0;
 }

 // Range check.
 if (outputPort < -1 || outputPort >= this->Algorithm->GetNumberOfOutputPorts())
 {
   vtkErrorMacro("UpdateData given output port index " << outputPort << " on an algorithm with "
                                                       << this->Algorithm->GetNumberOfOutputPorts()
                                                       << " output ports.");
   return 0;
 }

 // Setup the request for data.
 if (!this->DataRequest)
 {
   this->DataRequest = vtkInformation::New();
   this->DataRequest->Set(REQUEST_DATA());
   // The request is forwarded upstream through the pipeline.
   this->DataRequest->Set(vtkExecutive::FORWARD_DIRECTION(), vtkExecutive::RequestUpstream);
   // Algorithms process this request after it is forwarded.
   this->DataRequest->Set(vtkExecutive::ALGORITHM_AFTER_FORWARD(), 1);
 }

 // Send the request.
 this->DataRequest->Set(FROM_OUTPUT_PORT(), outputPort);
 return this->ProcessRequest(
   this->DataRequest, this->GetInputInformation(), this->GetOutputInformation());
}

2.7 ForwardUpstream(vtkInformation* request)

int vtkExecutive::ForwardUpstream(vtkInformation* request)
{
 // Do not forward upstream if the input is shared with another
 // executive.
 if (this->SharedInputInformation)
 {
   return 1;
 }

 if (!this->Algorithm->ModifyRequest(request, BeforeForward))
 {
   return 0;
 }

 // Forward the request upstream through all input connections.
 int result = 1;
 for (int i = 0; i < this->GetNumberOfInputPorts(); ++i)
 {
   int nic = this->Algorithm->GetNumberOfInputConnections(i);
   vtkInformationVector* inVector = this->GetInputInformation()[i];
   for (int j = 0; j < nic; ++j)
   {
     vtkInformation* info = inVector->GetInformationObject(j);
     // Get the executive producing this input.  If there is none, then
     // it is a nullptr input.
     vtkExecutive* e;
     int producerPort;
     vtkExecutive::PRODUCER()->Get(info, e, producerPort);
     if (e)
     {
       int port = request->Get(FROM_OUTPUT_PORT());
       request->Set(FROM_OUTPUT_PORT(), producerPort);
       if (!e->ProcessRequest(request, e->GetInputInformation(), e->GetOutputInformation()))
       {
         result = 0;
       }
       request->Set(FROM_OUTPUT_PORT(), port);
     }
   }
 }

 if (!this->Algorithm->ModifyRequest(request, AfterForward))
 {
   return 0;
 }

 return result;
}

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容