Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions internal/pkg/postprocessor/extractor/object_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package extractor

import (
"fmt"
"strings"

"github.com/internetarchive/Zeno/internal/pkg/utils"
"github.com/internetarchive/Zeno/pkg/models"
)

// All the supported object storage servers
var ObjectStorageServers = func() (s []string) {
s = append(s, s3CompatibleServers...)
s = append(s, azureServers...)
return s
}()

// IsObjectStorage checks if the response is from an object storage server
func IsObjectStorage(URL *models.URL) bool {
return utils.StringContainsSliceElements(URL.GetResponse().Header.Get("Server"), ObjectStorageServers) &&
strings.Contains(strings.ToLower(URL.GetResponse().Header.Get("Content-Type")), "/xml") // tricky match both application/xml and text/xml
}

// ObjectStorage decides which helper to call based on the object storage server
func ObjectStorage(URL *models.URL) ([]*models.URL, error) {
defer URL.RewindBody()

server := URL.GetResponse().Header.Get("Server")
if utils.StringContainsSliceElements(server, s3CompatibleServers) {
return s3Compatible(URL)
} else if utils.StringContainsSliceElements(server, azureServers) {
return azure(URL)
} else {
return nil, fmt.Errorf("unknown object storage server: %s", server)
}
}
78 changes: 78 additions & 0 deletions internal/pkg/postprocessor/extractor/object_storage_azure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package extractor

import (
"encoding/xml"
"fmt"
"strings"

"github.com/internetarchive/Zeno/internal/pkg/log"
"github.com/internetarchive/Zeno/pkg/models"
)

// Azure Blob Storage
var azureServers = []string{
// Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0
"Windows-Azure-Blob",
// Blob Service Version 1.0 Microsoft-HTTPAPI/2.0
"Blob Service Version",
// emulator, https://github.com/Azure/Azurite
"Azurite-Blob",
}
Comment on lines +12 to +20

@yzqzss yzqzss May 15, 2025

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested Zeno with the azure extracor locally with Azurite and it works. But I haven't run it on a real Azure Blob yet.

Also, I found that some Azure Blobs use the Server: Blob Service Version XXX... header, and I don't know what the difference is between them and Server: Windows-Azure-Blob....

https://en.fofa.info/result?qbase64=aGVhZGVyPSJXaW5kb3dzLUF6dXJlLUJsb2Ii
https://en.fofa.info/result?qbase64=aGVhZGVyPSJCbG9iIFNlcnZpY2UgVmVyc2lvbiI%3D


// AzureBlobEnumerationResults represents the XML structure of an Azure Blob Storage listing
// <https://learn.microsoft.com/en-us/rest/api/storageservices/enumerating-blob-resources>
type AzureBlobEnumerationResults struct {
XMLName xml.Name `xml:"EnumerationResults"`
Prefix string `xml:"Prefix"`
Marker string `xml:"Marker"`
Blobs []AzureBlob `xml:"Blobs>Blob"`
NextMarker string `xml:"NextMarker"`
}

type AzureBlob struct {
Name string `xml:"Name"` // path/to/file.txt, no leading slash
LastModified string `xml:"Properties>Last-Modified"`
Size int64 `xml:"Properties>Content-Length"`
}

var azureLogger = log.NewFieldedLogger(&log.Fields{
"component": "postprocessor.extractor.object_storage_azure",
})

func azure(URL *models.URL) ([]*models.URL, error) {
defer URL.RewindBody()

// Decode XML result
var result AzureBlobEnumerationResults
if err := xml.NewDecoder(URL.GetBody()).Decode(&result); err != nil {
return nil, fmt.Errorf("error decoding AzureBlobEnumerationResults XML: %w", err)
}

reqURL := URL.GetRequest().URL

var outlinks []string

if result.NextMarker != "" {
nextURL := *reqURL
q := nextURL.Query()
q.Set("marker", result.NextMarker)
nextURL.RawQuery = q.Encode()
outlinks = append(outlinks, nextURL.String())
}

// Build base url for files
baseURL := *reqURL
baseURL.RawQuery = ""
baseURL.ForceQuery = false

for _, blob := range result.Blobs {
if strings.HasPrefix(blob.Name, "/") {
azureLogger.Warn("invalid blob name: it starts with a leading slash", "blob_name", blob.Name)
continue
}
fileURL := baseURL.JoinPath(blob.Name)
outlinks = append(outlinks, fileURL.String())
}

return toURLs(outlinks), nil
}
74 changes: 74 additions & 0 deletions internal/pkg/postprocessor/extractor/object_storage_azure_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package extractor

import (
"net/http"
"strings"
"testing"
)

func TestAzure(t *testing.T) {
t.Run("Valid XML with single object and nextMarker", func(t *testing.T) {
xmlBody := `<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<EnumerationResults ContainerName="zeno">
<Prefix />
<Marker>dir/azure_files/test_10.txt</Marker>
<MaxResults>1</MaxResults>
<Blobs>
<Blob>
<Name>dir/azure_files/test_100.txt</Name>
<Properties>
<Creation-Time>Thu, 15 May 2025 08:02:20 GMT</Creation-Time>
<Last-Modified>Thu, 15 May 2025 08:02:20 GMT</Last-Modified>
<Etag>0x202A0424CF3AFA0</Etag>
<Content-Length>4</Content-Length>
<Content-Type>text/plain</Content-Type>
<Content-MD5>kZ0ReVbTE1xMaD/wITUvXA==</Content-MD5>
<BlobType>BlockBlob</BlobType>
<LeaseStatus>unlocked</LeaseStatus>
<LeaseState>available</LeaseState>
<ServerEncrypted>true</ServerEncrypted>
<AccessTier>Hot</AccessTier>
<AccessTierInferred>true</AccessTierInferred>
<AccessTierChangeTime>Thu, 15 May 2025 08:02:20 GMT</AccessTierChangeTime>
</Properties>
</Blob>
</Blobs>
<NextMarker>dir/azure_files/test_100.txt</NextMarker>
</EnumerationResults>`
URLObj := buildTestObjectStorageURLObj("http://example.com/devstoreaccount1/zeno?restype=container&comp=list&maxresults=1", xmlBody, http.Header{"Server": []string{"Windows-Azure-Blob/1.0"}})
outlinks, err := azure(URLObj)
outlinks2, err2 := ObjectStorage(URLObj) // indirectly call, for coverage testing
if err != nil || err2 != nil {
t.Fatalf("azure() returned unexpected error: %v, err2: %v", err, err2)
}

if len(outlinks) != 2 || len(outlinks2) != 2 {
t.Fatalf("expected 2 outlinks, got %d and %d", len(outlinks), len(outlinks2))
}

expectedOutlinks := []string{
"http://example.com/devstoreaccount1/zeno?comp=list&marker=dir%2Fazure_files%2Ftest_100.txt&maxresults=1&restype=container",
"http://example.com/devstoreaccount1/zeno/dir/azure_files/test_100.txt",
}

for i, outlink := range outlinks {
outlink.Parse()
if outlink.String() != expectedOutlinks[i] {
t.Errorf("expected %s, got %s", expectedOutlinks[i], outlink.String())
}
}
})

t.Run("invalid XML", func(t *testing.T) {
xmlBody := `<EnumerationResults><BadTag`
URLObj := buildTestObjectStorageURLObj("http://example.com/devstoreaccount1/zeno?restype=container&comp=list&maxresults=1", xmlBody, http.Header{"Server": []string{"Windows-Azure-Blob/1.0"}})
_, err := azure(URLObj)
if err == nil {
t.Fatalf("expected error for invalid XML, got none")
}

if !strings.Contains(err.Error(), "error decoding AzureBlobEnumerationResults XML") {
t.Fatalf("expected error 'error decoding AzureBlobEnumerationResults XML', got %v", err)
}
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@ import (
"encoding/xml"
"fmt"
"net/url"
"strings"

"github.com/internetarchive/Zeno/internal/pkg/utils"
"github.com/internetarchive/Zeno/pkg/models"
)

var validS3Servers = []string{
var s3CompatibleServers = []string{
"AmazonS3",
"WasabiS3",
"UploadServer", // Google Cloud Storage
"Windows-Azure-Blob",
"AliyunOSS", // Alibaba Object Storage Service
"UploadServer", // Google Cloud Storage, https://cloud.google.com/storage/docs/listing-objects#list-objects-xml
"AliyunOSS", // Alibaba Object Storage Service
}

// S3ListBucketResult represents the XML structure of an S3 bucket listing
Expand All @@ -40,54 +37,9 @@ type CommonPrefix struct {
Prefix []string `xml:"Prefix"`
}

// IsS3 checks if the response is from an S3 server
func IsS3(URL *models.URL) bool {
return utils.StringContainsSliceElements(URL.GetResponse().Header.Get("Server"), validS3Servers) &&
strings.Contains(URL.GetResponse().Header.Get("Content-Type"), "xml")
}

// S3 decides which helper to call based on the query param: old style (no list-type=2) vs. new style (list-type=2)
func S3(URL *models.URL) ([]*models.URL, error) {
defer URL.RewindBody()

// Decode XML result
var result S3ListBucketResult
if err := xml.NewDecoder(URL.GetBody()).Decode(&result); err != nil {
return nil, fmt.Errorf("error decoding S3 XML: %v", err)
}

// Prepare base data
reqURL := URL.GetRequest().URL
listType := reqURL.Query().Get("list-type")

// Build https://<host> as the base for direct file links
baseStr := fmt.Sprintf("https://%s", reqURL.Host)
parsedBase, err := url.Parse(baseStr)
if err != nil {
return nil, fmt.Errorf("invalid base URL: %v", err)
}

var outlinkStrings []string

// Delegate to old style or new style
if listType != "2" {
// Old style S3 listing, uses marker
outlinkStrings = s3Legacy(reqURL, parsedBase, result)
} else {
// New style listing (list-type=2), uses continuation token and/or CommonPrefixes
outlinkStrings = s3V2(reqURL, parsedBase, result)
}

// Convert from []string -> []*models.URL
var outlinks []*models.URL
for _, link := range outlinkStrings {
outlinks = append(outlinks, &models.URL{Raw: link})
}
return outlinks, nil
}

// s3Legacy handles the old ListObjects style, which uses `marker` for pagination.
func s3Legacy(reqURL *url.URL, parsedBase *url.URL, result S3ListBucketResult) []string {
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html
func s3Legacy(reqURL, baseURL *url.URL, result S3ListBucketResult) []string {
var outlinks []string

// If there are objects in <Contents>, create a "next page" URL using `marker`
Expand All @@ -103,8 +55,7 @@ func s3Legacy(reqURL *url.URL, parsedBase *url.URL, result S3ListBucketResult) [
// Produce direct file links for each object
for _, obj := range result.Contents {
if obj.Size > 0 {
fileURL := *parsedBase
fileURL.Path += "/" + obj.Key
fileURL := baseURL.JoinPath(obj.Key)
outlinks = append(outlinks, fileURL.String())
}
}
Expand All @@ -113,7 +64,8 @@ func s3Legacy(reqURL *url.URL, parsedBase *url.URL, result S3ListBucketResult) [
}

// s3V2 handles the new ListObjectsV2 style, which uses `continuation-token` and can return CommonPrefixes.
func s3V2(reqURL *url.URL, parsedBase *url.URL, result S3ListBucketResult) []string {
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
func s3V2(reqURL, baseURL *url.URL, result S3ListBucketResult) []string {
var outlinks []string

// If we have common prefixes => "subfolders"
Expand All @@ -129,11 +81,11 @@ func s3V2(reqURL *url.URL, parsedBase *url.URL, result S3ListBucketResult) []str
}
}
} else {

// Otherwise, we have actual objects in <Contents>
for _, obj := range result.Contents {
if obj.Size > 0 {
fileURL := *parsedBase
fileURL.Path += "/" + obj.Key
fileURL := *baseURL.JoinPath(obj.Key)
outlinks = append(outlinks, fileURL.String())
}
}
Expand All @@ -150,3 +102,37 @@ func s3V2(reqURL *url.URL, parsedBase *url.URL, result S3ListBucketResult) []str

return outlinks
}

// s3Compatible decides which helper to call based on the query param: old style (no list-type=2) vs. new style (list-type=2)
func s3Compatible(URL *models.URL) ([]*models.URL, error) {
defer URL.RewindBody()

// Decode XML result
var result S3ListBucketResult
if err := xml.NewDecoder(URL.GetBody()).Decode(&result); err != nil {
return nil, fmt.Errorf("error decoding S3ListBucketResult XML: %w", err)
}

reqURL := URL.GetRequest().URL
listType := reqURL.Query().Get("list-type")

// Prepare base url
baseURL := new(url.URL)
*baseURL = *reqURL
baseURL.RawQuery = ""
baseURL.ForceQuery = false
baseURL.Path = "/"

var outlinks []string

// Delegate to old style or new style
if listType != "2" {
// Old style S3 listing, uses marker
outlinks = s3Legacy(reqURL, baseURL, result)
} else {
// New style listing (list-type=2), uses continuation token and/or CommonPrefixes
outlinks = s3V2(reqURL, baseURL, result)
}

return toURLs(outlinks), nil
}
Loading
Loading