I'm having an issue with scheduling job in Data Factory. I'm trying to approach a scheduled job per hour which will execute the same script each hour with different condition.
Consider I have a bunch of Avro Files spread in Azure Data Lake Store with following pattern. /Data/SomeEntity/{date:yyyy}/{date:MM}/{date:dd}/SomeEntity_{date:yyyy}{date:MM}{date:dd}__{date:H}
Each hour new files are added to Data Lake Store. In order to process the files only once I decided to handle them by help of U-SQL virtual file set column and some SyncTable which i created in Data Lake Store.
My query looks like following.
DECLARE @file_set_path string = /Data/SomeEntity/{date:yyyy}/{date:MM}/{date:dd}/SomeEntity_{date:yyyy}_{date:MM}_{date:dd}__{date:H};
@result = EXTRACT [Id] long,
....
date DateTime
FROM @file_set_path
USING someextractor;
@rdate =
SELECT MAX(ProcessedDate) AS ProcessedDate
FROM dbo.SyncTable
WHERE EntityName== "SomeEntity";
@finalResult = SELECT [Id],... FROM @result
CROSS JOIN @rdate AS r
WHERE date >= r.ProcessedDate;
since I can't use rowset variable in where clause I'm cross joining the singe row with set , however even in this case U-SQL won't find the correct files and always return all files set.
Is there any workaround or other approach ?
I think this approach should work unless there is something not quite right somewhere, ie can you confirm the datatypes of the dbo.SyncTable table? Dump out @rdate and make sure the value you get there is what you expect.
I put together a simple demo which worked as expected. My copy of SyncTable had one record with the value of 01/01/2018:
@working =
SELECT *
FROM (
VALUES
( (int)1, DateTime.Parse("2017/12/31") ),
( (int)2, DateTime.Parse("2018/01/01") ),
( (int)3, DateTime.Parse("2018/02/01") )
) AS x ( id, someDate );
@rdate =
SELECT MAX(ProcessedDate) AS maxDate
FROM dbo.SyncTable;
//@output =
// SELECT *
// FROM @rdate;
@output =
SELECT *, (w.someDate - r.maxDate).ToString() AS diff
FROM @working AS w
CROSS JOIN
@rdate AS r
WHERE w.someDate >= r.maxDate;
OUTPUT @output TO "/output/output.csv"
USING Outputters.Csv();
I did try this with a filepath (full script here). The thing to remember is the custom date format H represents the hour as a number from 0 to 23. If your SyncTable date does not have a time component to it when you insert it, it will default to midnight (0), meaning the whole day will be collected. Your file structure should look something like this according to your pattern:
"D:\Data Lake\USQLDataRoot\Data\SomeEntity\2017\12\31\SomeEntity_2017_12_31__8\test.csv"
I note your filepath has underscores in the second section and a double underscore before the hour section (which will be between 0 and 23, single digit up to the hour 10). I notice your fileset path does not have a file type or quotes - I've used test.csv in my tests. My results:

Basically I think the approach will work, but there is something not quite right, maybe in your file structure, the value in your SyncTable, the datatype etc. You need to go over the details, dump out intermediate values to check until you find the problem.
Doesn't the gist of wBob's full script resolve your issue? Here is a very slightly edited version of wBob's full script to address some of the issues you raised:
Ability to filter on SyncTable,
last part of pattern is file name and not folder. Sample file and structure: \Data\SomeEntity\2018\01\01\SomeEntity_2018_01_01__1
DECLARE @file_set_path string = @"/Data/SomeEntity/{date:yyyy}/{date:MM}/{date:dd}/SomeEntity_{date:yyyy}_{date:MM}_{date:dd}__{date:H}";
@input =
EXTRACT [Id] long,
date DateTime
FROM @file_set_path
USING Extractors.Text();
// in lieu of creating actual table
@syncTable =
SELECT * FROM
( VALUES
( "SomeEntity", new DateTime(2018,01,01,01,00,00) ),
( "AnotherEntity", new DateTime(2018,01,01,01,00,00) ),
( "SomeEntity", new DateTime(2018,01,01,00,00,00) ),
( "AnotherEntity", new DateTime(2018,01,01,00,00,00) ),
( "SomeEntity", new DateTime(2017,12,31,23,00,00) ),
( "AnotherEntity", new DateTime(2017,12,31,23,00,00) )
) AS x ( EntityName, ProcessedDate );
@rdate =
SELECT MAX(ProcessedDate) AS maxDate
FROM @syncTable
WHERE EntityName== "SomeEntity";
@output =
SELECT *,
date.ToString() AS dateString
FROM @input AS i
CROSS JOIN
@rdate AS r
WHERE i.date >= r.maxDate;
OUTPUT @output
TO "/output/output.txt"
ORDER BY Id
USING Outputters.Text(quoting:false);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With